Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Yun-Tang Hsu <hsuy@vmware.com>
  • Loading branch information
yuntanghsu committed Nov 17, 2022
1 parent 61ed95e commit 5df9906
Show file tree
Hide file tree
Showing 11 changed files with 36 additions and 35 deletions.
6 changes: 3 additions & 3 deletions cmd/theia-manager/theia-manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func createAPIServerConfig(
cipherSuites []uint16,
tlsMinVersion uint16,
nprq querier.NPRecommendationQuerier,
chq querier.ClickHouseStatusQuerier,
chq querier.ClickHouseStatQuerier,
) (*apiserver.Config, error) {
secureServing := genericoptions.NewSecureServingOptions().WithLoopback()
authentication := genericoptions.NewDelegatingAuthenticationOptions()
Expand Down Expand Up @@ -127,7 +127,7 @@ func run(o *Options) error {
npRecommendationInformer := crdInformerFactory.Crd().V1alpha1().NetworkPolicyRecommendations()
recommendedNPInformer := crdInformerFactory.Crd().V1alpha1().RecommendedNetworkPolicies()
npRecoController := networkpolicyrecommendation.NewNPRecommendationController(crdClient, kubeClient, npRecommendationInformer, recommendedNPInformer)
clickHouseController := stats.NewStatsController(kubeClient)
clickHouseStatQuerierImpl := stats.NewClickHouseStatQuerierImpl(kubeClient)

cipherSuites, err := cipher.GenerateCipherSuitesList(o.config.APIServer.TLSCipherSuites)
if err != nil {
Expand All @@ -141,7 +141,7 @@ func run(o *Options) error {
cipherSuites,
cipher.TLSVersionMap[o.config.APIServer.TLSMinVersion],
npRecoController,
clickHouseController)
clickHouseStatQuerierImpl)
if err != nil {
return fmt.Errorf("error creating API server config: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/stats/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ type ClickHouseStats struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`

Result [][]string `json:"jobType,omitempty"`
Stat [][]string `json:"Stat,omitempty"`
}
10 changes: 5 additions & 5 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type ExtraConfig struct {
k8sClient kubernetes.Interface
caCertController *certificate.CACertController
npRecommendationQuerier querier.NPRecommendationQuerier
clickHouseStatusQuerier querier.ClickHouseStatusQuerier
clickHouseStatQuerier querier.ClickHouseStatQuerier
}

// Config defines the config for Theia manager apiserver.
Expand All @@ -85,7 +85,7 @@ type TheiaManagerAPIServer struct {
GenericAPIServer *genericapiserver.GenericAPIServer
caCertController *certificate.CACertController
NPRecommendationQuerier querier.NPRecommendationQuerier
ClickHouseStatusQuerier querier.ClickHouseStatusQuerier
ClickHouseStatusQuerier querier.ClickHouseStatQuerier
}

func (s *TheiaManagerAPIServer) Run(ctx context.Context) error {
Expand All @@ -102,15 +102,15 @@ func NewConfig(
k8sClient kubernetes.Interface,
caCertController *certificate.CACertController,
npRecommendationQuerier querier.NPRecommendationQuerier,
clickHouseStatusQuerier querier.ClickHouseStatusQuerier,
clickHouseStatQuerier querier.ClickHouseStatQuerier,
) *Config {
return &Config{
genericConfig: genericConfig,
extraConfig: ExtraConfig{
k8sClient: k8sClient,
caCertController: caCertController,
npRecommendationQuerier: npRecommendationQuerier,
clickHouseStatusQuerier: clickHouseStatusQuerier,
clickHouseStatQuerier: clickHouseStatQuerier,
},
}
}
Expand Down Expand Up @@ -150,7 +150,7 @@ func (c Config) New() (*TheiaManagerAPIServer, error) {
GenericAPIServer: s,
caCertController: c.extraConfig.caCertController,
NPRecommendationQuerier: c.extraConfig.npRecommendationQuerier,
ClickHouseStatusQuerier: c.extraConfig.clickHouseStatusQuerier,
ClickHouseStatusQuerier: c.extraConfig.clickHouseStatQuerier,
}
if err := installAPIGroup(apiServer); err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/apiserver/registry/stats/clickhouse/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ const defaultNameSpace = "flow-visibility"

// REST implements rest.Storage for clickhouse.
type REST struct {
clickHouseStatusQuerier querier.ClickHouseStatusQuerier
clickHouseStatusQuerier querier.ClickHouseStatQuerier
}

var (
_ rest.Getter = &REST{}
)

// NewREST returns a REST object that will work against API services.
func NewREST(chq querier.ClickHouseStatusQuerier) *REST {
func NewREST(chq querier.ClickHouseStatQuerier) *REST {
return &REST{clickHouseStatusQuerier: chq}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/networkpolicyrecommendation/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func TestNPRecommendation(t *testing.T) {
// the SparkApplication id.
if !serviceCreated {
// Mock ClickHouse database
util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) {
util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true))
if err != nil {
return db, err
Expand Down Expand Up @@ -345,7 +345,7 @@ func TestNPRecommendation(t *testing.T) {
assert.Equal(t, 1, len(nprList), "Expected exactly one NetworkPolicyRecommendation, got %d", len(nprList))
assert.Equal(t, npr, nprList[0])

util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) {
util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true))
if err != nil {
return db, err
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) {
setup: func(client kubernetes.Interface) {
createClickHouseService(client)
createClickHouseSecret(client)
util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) {
util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) {
return nil, fmt.Errorf("connection error")
}
},
Expand All @@ -619,7 +619,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) {
setup: func(client kubernetes.Interface) {
createClickHouseService(client)
createClickHouseSecret(client)
util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) {
util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) {
var err error
db, _, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true))
return db, err
Expand All @@ -632,7 +632,7 @@ func TestGetPolicyRecommendationResult(t *testing.T) {
setup: func(client kubernetes.Interface) {
createClickHouseService(client)
createClickHouseSecret(client)
util.OpenSql = func(driverName, dataSourceName string) (*sql.DB, error) {
util.SqlOpenFunc = func(driverName, dataSourceName string) (*sql.DB, error) {
var err error
var mock sqlmock.Sqlmock
db, mock, err = sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), sqlmock.MonitorPingsOption(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,53 +128,53 @@ ORDER BY count()
DESC SETTINGS allow_introspection_functions=1`,
}

type StatsController struct {
type ClickHouseStatQuerierImpl struct {
kubeClient kubernetes.Interface
clickhouseConnect *sql.DB
}

func NewStatsController(
func NewClickHouseStatQuerierImpl(
kubeClient kubernetes.Interface,
) *StatsController {
c := &StatsController{
) *ClickHouseStatQuerierImpl {
c := &ClickHouseStatQuerierImpl{
kubeClient: kubeClient,
}
return c
}

func (c *StatsController) GetDiskInfo(namespace string) ([][]string, error) {
func (c *ClickHouseStatQuerierImpl) GetDiskInfo(namespace string) ([][]string, error) {
data, err := c.getDataFromClickHouse(diskQuery, namespace)
if err != nil {
return nil, fmt.Errorf("error when getting diskInfo from clickhouse: %v", err)
}
return data, nil
}

func (c *StatsController) GetTableInfo(namespace string) ([][]string, error) {
func (c *ClickHouseStatQuerierImpl) GetTableInfo(namespace string) ([][]string, error) {
data, err := c.getDataFromClickHouse(tableInfoQuery, namespace)
if err != nil {
return nil, fmt.Errorf("error when getting tableInfo from clickhouse: %v", err)
}
return data, nil
}

func (c *StatsController) GetInsertRate(namespace string) ([][]string, error) {
func (c *ClickHouseStatQuerierImpl) GetInsertRate(namespace string) ([][]string, error) {
data, err := c.getDataFromClickHouse(insertRateQuery, namespace)
if err != nil {
return nil, fmt.Errorf("error when getting insertRate from clickhouse: %v", err)
}
return data, nil
}

func (c *StatsController) GetStackTraces(namespace string) ([][]string, error) {
func (c *ClickHouseStatQuerierImpl) GetStackTraces(namespace string) ([][]string, error) {
data, err := c.getDataFromClickHouse(stackTracesQuery, namespace)
if err != nil {
return nil, fmt.Errorf("error when getting stackTraces from clickhouse: %v", err)
}
return data, nil
}

func (c *StatsController) getDataFromClickHouse(query int, namespace string) ([][]string, error) {
func (c *ClickHouseStatQuerierImpl) getDataFromClickHouse(query int, namespace string) ([][]string, error) {
var err error
if c.clickhouseConnect == nil {
c.clickhouseConnect, err = util.SetupClickHouseConnection(c.kubeClient, namespace)
Expand All @@ -184,6 +184,7 @@ func (c *StatsController) getDataFromClickHouse(query int, namespace string) ([]
}
result, err := c.clickhouseConnect.Query(queryMap[query])
if err != nil {
c.clickhouseConnect = nil
return nil, fmt.Errorf("failed to get data from clickhouse: %v", err)
}
defer result.Close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestGetDataFromClickHouse(t *testing.T) {
db, mock, err := sqlmock.New()
assert.NoError(t, err)
mock.ExpectQuery(regexp.QuoteMeta(queryMap[tc.query])).WillReturnRows(tc.returnedRow)
controller := StatsController{clickhouseConnect: db}
controller := ClickHouseStatQuerierImpl{clickhouseConnect: db}
result, err := controller.getDataFromClickHouse(tc.query, config.FlowVisibilityNS)

if tc.expectedError == nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type NPRecommendationQuerier interface {
CreateNetworkPolicyRecommendation(namespace string, networkPolicyRecommendation *v1alpha1.NetworkPolicyRecommendation) (*v1alpha1.NetworkPolicyRecommendation, error)
}

type ClickHouseStatusQuerier interface {
type ClickHouseStatQuerier interface {
GetDiskInfo(namespace string) ([][]string, error)
GetTableInfo(namespace string) ([][]string, error)
GetInsertRate(namespace string) ([][]string, error)
Expand Down
4 changes: 2 additions & 2 deletions pkg/theia/commands/clickhouse_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ func getStatus(cmd *cobra.Command, args []string) error {
return fmt.Errorf("error when getting clickhouse %v status: %s", name, err)
}
if name == "stackTraces" {
TableOutputVertical(data.Result)
TableOutputVertical(data.Stat)
} else {
TableOutput(data.Result)
TableOutput(data.Stat)
}
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/theia/commands/clickhouse_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestGetStatus(t *testing.T) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/diskInfo"):
status := &stats.ClickHouseStats{
Result: [][]string{{"test_diskInfo"}},
Stat: [][]string{{"test_diskInfo"}},
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand All @@ -63,7 +63,7 @@ func TestGetStatus(t *testing.T) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/tableInfo"):
status := &stats.ClickHouseStats{
Result: [][]string{{"test_tableInfo"}},
Stat: [][]string{{"test_tableInfo"}},
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand All @@ -80,7 +80,7 @@ func TestGetStatus(t *testing.T) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/insertRate"):
status := &stats.ClickHouseStats{
Result: [][]string{{"test_insertRate"}},
Stat: [][]string{{"test_insertRate"}},
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand All @@ -97,7 +97,7 @@ func TestGetStatus(t *testing.T) {
switch strings.TrimSpace(r.URL.Path) {
case fmt.Sprintf("/apis/stats.theia.antrea.io/v1alpha1/clickhouse/stackTraces"):
status := &stats.ClickHouseStats{
Result: [][]string{{"test_stackTraces"}, {"fakeData"}},
Stat: [][]string{{"test_stackTraces"}, {"fakeData"}},
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
Expand Down
4 changes: 2 additions & 2 deletions pkg/util/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"k8s.io/client-go/kubernetes"
)

var OpenSql = sql.Open
var SqlOpenFunc = sql.Open

func ParseRecommendationName(npName string) error {
if !strings.HasPrefix(npName, "pr-") {
Expand Down Expand Up @@ -80,7 +80,7 @@ func ConnectClickHouse(url string) (*sql.DB, error) {
var connect *sql.DB
// Open the database and ping it
var err error
connect, err = OpenSql("clickhouse", url)
connect, err = SqlOpenFunc("clickhouse", url)
if err != nil {
return connect, fmt.Errorf("failed to open ClickHouse: %v", err)
}
Expand Down

0 comments on commit 5df9906

Please sign in to comment.