Skip to content

Commit

Permalink
cherry pick pingcap#2988 to release-1.1 (pingcap#2994)
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

Co-authored-by: Yecheng Fu <fuyecheng@pingcap.com>
  • Loading branch information
ti-srebot and cofyc authored Jul 21, 2020
1 parent 5c8f7cd commit cff6d3f
Show file tree
Hide file tree
Showing 7 changed files with 561 additions and 365 deletions.
2 changes: 1 addition & 1 deletion pkg/controller/tidb_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"time"

"github.com/pingcap/tidb-operator/pkg/apis/pingcap/v1alpha1"
"github.com/pingcap/tidb-operator/pkg/httputil"
"github.com/pingcap/tidb-operator/pkg/util"
httputil "github.com/pingcap/tidb-operator/pkg/util/http"
"github.com/pingcap/tidb/config"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down
252 changes: 252 additions & 0 deletions pkg/pdapi/fake_pdapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pdapi

import (
"fmt"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
)

type ActionType string

const (
GetHealthActionType ActionType = "GetHealth"
GetConfigActionType ActionType = "GetConfig"
GetClusterActionType ActionType = "GetCluster"
GetMembersActionType ActionType = "GetMembers"
GetStoresActionType ActionType = "GetStores"
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
GetStoreActionType ActionType = "GetStore"
DeleteStoreActionType ActionType = "DeleteStore"
SetStoreStateActionType ActionType = "SetStoreState"
DeleteMemberByIDActionType ActionType = "DeleteMemberByID"
DeleteMemberActionType ActionType = "DeleteMember "
SetStoreLabelsActionType ActionType = "SetStoreLabels"
UpdateReplicationActionType ActionType = "UpdateReplicationConfig"
BeginEvictLeaderActionType ActionType = "BeginEvictLeader"
EndEvictLeaderActionType ActionType = "EndEvictLeader"
GetEvictLeaderSchedulersActionType ActionType = "GetEvictLeaderSchedulers"
GetPDLeaderActionType ActionType = "GetPDLeader"
TransferPDLeaderActionType ActionType = "TransferPDLeader"
)

type NotFoundReaction struct {
actionType ActionType
}

func (nfr *NotFoundReaction) Error() string {
return fmt.Sprintf("not found %s reaction. Please add the reaction", nfr.actionType)
}

type Action struct {
ID uint64
Name string
Labels map[string]string
Replication PDReplicationConfig
}

type Reaction func(action *Action) (interface{}, error)

// FakePDClient implements a fake version of PDClient.
type FakePDClient struct {
reactions map[ActionType]Reaction
}

func NewFakePDClient() *FakePDClient {
return &FakePDClient{reactions: map[ActionType]Reaction{}}
}

func (pc *FakePDClient) AddReaction(actionType ActionType, reaction Reaction) {
pc.reactions[actionType] = reaction
}

// fakeAPI is a small helper for fake API calls
func (pc *FakePDClient) fakeAPI(actionType ActionType, action *Action) (interface{}, error) {
if reaction, ok := pc.reactions[actionType]; ok {
result, err := reaction(action)
if err != nil {
return nil, err
}
return result, nil
}
return nil, &NotFoundReaction{actionType}
}

func (pc *FakePDClient) GetHealth() (*HealthInfo, error) {
action := &Action{}
result, err := pc.fakeAPI(GetHealthActionType, action)
if err != nil {
return nil, err
}
return result.(*HealthInfo), nil
}

func (pc *FakePDClient) GetConfig() (*PDConfigFromAPI, error) {
action := &Action{}
result, err := pc.fakeAPI(GetConfigActionType, action)
if err != nil {
return nil, err
}
return result.(*PDConfigFromAPI), nil
}

func (pc *FakePDClient) GetCluster() (*metapb.Cluster, error) {
action := &Action{}
result, err := pc.fakeAPI(GetClusterActionType, action)
if err != nil {
return nil, err
}
return result.(*metapb.Cluster), nil
}

func (pc *FakePDClient) GetMembers() (*MembersInfo, error) {
action := &Action{}
result, err := pc.fakeAPI(GetMembersActionType, action)
if err != nil {
return nil, err
}
return result.(*MembersInfo), nil
}

func (pc *FakePDClient) GetStores() (*StoresInfo, error) {
action := &Action{}
result, err := pc.fakeAPI(GetStoresActionType, action)
if err != nil {
return nil, err
}
return result.(*StoresInfo), nil
}

func (pc *FakePDClient) GetTombStoneStores() (*StoresInfo, error) {
action := &Action{}
result, err := pc.fakeAPI(GetTombStoneStoresActionType, action)
if err != nil {
return nil, err
}
return result.(*StoresInfo), nil
}

func (pc *FakePDClient) GetStore(id uint64) (*StoreInfo, error) {
action := &Action{
ID: id,
}
result, err := pc.fakeAPI(GetStoreActionType, action)
if err != nil {
return nil, err
}
return result.(*StoreInfo), nil
}

func (pc *FakePDClient) DeleteStore(id uint64) error {
if reaction, ok := pc.reactions[DeleteStoreActionType]; ok {
action := &Action{ID: id}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) SetStoreState(id uint64, state string) error {
if reaction, ok := pc.reactions[SetStoreStateActionType]; ok {
action := &Action{ID: id}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) DeleteMemberByID(id uint64) error {
if reaction, ok := pc.reactions[DeleteMemberByIDActionType]; ok {
action := &Action{ID: id}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) DeleteMember(name string) error {
if reaction, ok := pc.reactions[DeleteMemberActionType]; ok {
action := &Action{Name: name}
_, err := reaction(action)
return err
}
return nil
}

// SetStoreLabels sets TiKV labels
func (pc *FakePDClient) SetStoreLabels(storeID uint64, labels map[string]string) (bool, error) {
if reaction, ok := pc.reactions[SetStoreLabelsActionType]; ok {
action := &Action{ID: storeID, Labels: labels}
result, err := reaction(action)
return result.(bool), err
}
return true, nil
}

// UpdateReplicationConfig updates the replication config
func (pc *FakePDClient) UpdateReplicationConfig(config PDReplicationConfig) error {
if reaction, ok := pc.reactions[UpdateReplicationActionType]; ok {
action := &Action{Replication: config}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) BeginEvictLeader(storeID uint64) error {
if reaction, ok := pc.reactions[BeginEvictLeaderActionType]; ok {
action := &Action{ID: storeID}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) EndEvictLeader(storeID uint64) error {
if reaction, ok := pc.reactions[EndEvictLeaderActionType]; ok {
action := &Action{ID: storeID}
_, err := reaction(action)
return err
}
return nil
}

func (pc *FakePDClient) GetEvictLeaderSchedulers() ([]string, error) {
if reaction, ok := pc.reactions[GetEvictLeaderSchedulersActionType]; ok {
action := &Action{}
result, err := reaction(action)
return result.([]string), err
}
return nil, nil
}

func (pc *FakePDClient) GetPDLeader() (*pdpb.Member, error) {
if reaction, ok := pc.reactions[GetPDLeaderActionType]; ok {
action := &Action{}
result, err := reaction(action)
return result.(*pdpb.Member), err
}
return nil, nil
}

func (pc *FakePDClient) TransferPDLeader(memberName string) error {
if reaction, ok := pc.reactions[TransferPDLeaderActionType]; ok {
action := &Action{Name: memberName}
_, err := reaction(action)
return err
}
return nil
}
135 changes: 135 additions & 0 deletions pkg/pdapi/pd_control.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package pdapi

import (
"crypto/tls"
"fmt"
"net/http"
"sync"

"k8s.io/client-go/kubernetes"
"k8s.io/klog"
)

// Namespace is a newtype of a string
type Namespace string

// PDControlInterface is an interface that knows how to manage and get tidb cluster's PD client
type PDControlInterface interface {
// GetPDClient provides PDClient of the tidb cluster.
GetPDClient(namespace Namespace, tcName string, tlsEnabled bool) PDClient
// GetPDEtcdClient provides PD etcd Client of the tidb cluster.
GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error)
}

// defaultPDControl is the default implementation of PDControlInterface.
type defaultPDControl struct {
mutex sync.Mutex
etcdmutex sync.Mutex
kubeCli kubernetes.Interface
pdClients map[string]PDClient
pdEtcdClients map[string]PDEtcdClient
}

// NewDefaultPDControl returns a defaultPDControl instance
func NewDefaultPDControl(kubeCli kubernetes.Interface) PDControlInterface {
return &defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}, pdEtcdClients: map[string]PDEtcdClient{}}
}

func (pdc *defaultPDControl) GetPDEtcdClient(namespace Namespace, tcName string, tlsEnabled bool) (PDEtcdClient, error) {
pdc.etcdmutex.Lock()
defer pdc.etcdmutex.Unlock()

var tlsConfig *tls.Config
var err error

if tlsEnabled {
tlsConfig, err = GetTLSConfig(pdc.kubeCli, namespace, tcName, nil)
if err != nil {
klog.Errorf("Unable to get tls config for tidb cluster %q, pd etcd client may not work: %v", tcName, err)
return nil, err
}
return NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, tlsConfig)
}
key := pdEtcdClientKey(namespace, tcName)
if _, ok := pdc.pdEtcdClients[key]; !ok {
pdetcdClient, err := NewPdEtcdClient(PDEtcdClientURL(namespace, tcName), DefaultTimeout, nil)
if err != nil {
return nil, err
}
pdc.pdEtcdClients[key] = pdetcdClient
}
return pdc.pdEtcdClients[key], nil
}

// GetPDClient provides a PDClient of real pd cluster,if the PDClient not existing, it will create new one.
func (pdc *defaultPDControl) GetPDClient(namespace Namespace, tcName string, tlsEnabled bool) PDClient {
pdc.mutex.Lock()
defer pdc.mutex.Unlock()

var tlsConfig *tls.Config
var err error
var scheme = "http"

if tlsEnabled {
scheme = "https"
tlsConfig, err = GetTLSConfig(pdc.kubeCli, namespace, tcName, nil)
if err != nil {
klog.Errorf("Unable to get tls config for tidb cluster %q, pd client may not work: %v", tcName, err)
return &pdClient{url: PdClientURL(namespace, tcName, scheme), httpClient: &http.Client{Timeout: DefaultTimeout}}
}

return NewPDClient(PdClientURL(namespace, tcName, scheme), DefaultTimeout, tlsConfig)
}

key := pdClientKey(scheme, namespace, tcName)
if _, ok := pdc.pdClients[key]; !ok {
pdc.pdClients[key] = NewPDClient(PdClientURL(namespace, tcName, scheme), DefaultTimeout, nil)
}
return pdc.pdClients[key]
}

// pdClientKey returns the pd client key
func pdClientKey(scheme string, namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s.%s.%s", scheme, clusterName, string(namespace))
}

func pdEtcdClientKey(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s.%s", clusterName, string(namespace))
}

// pdClientUrl builds the url of pd client
func PdClientURL(namespace Namespace, clusterName string, scheme string) string {
return fmt.Sprintf("%s://%s-pd.%s:2379", scheme, clusterName, string(namespace))
}

func PDEtcdClientURL(namespace Namespace, clusterName string) string {
return fmt.Sprintf("%s-pd.%s:2379", clusterName, string(namespace))
}

// FakePDControl implements a fake version of PDControlInterface.
type FakePDControl struct {
defaultPDControl
}

func NewFakePDControl(kubeCli kubernetes.Interface) *FakePDControl {
return &FakePDControl{
defaultPDControl{kubeCli: kubeCli, pdClients: map[string]PDClient{}},
}
}

func (fpc *FakePDControl) SetPDClient(namespace Namespace, tcName string, pdclient PDClient) {
fpc.defaultPDControl.pdClients[pdClientKey("http", namespace, tcName)] = pdclient
}
Loading

0 comments on commit cff6d3f

Please sign in to comment.