Skip to content

Commit

Permalink
PD graceful upgrade (#99)
Browse files Browse the repository at this point in the history
* pd graceful upgrade
  • Loading branch information
xiaojingchen authored and tennix committed Sep 29, 2018
1 parent 669b023 commit 44fff7a
Show file tree
Hide file tree
Showing 14 changed files with 761 additions and 198 deletions.
2 changes: 2 additions & 0 deletions pkg/apis/pingcap.com/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,11 @@ type ResourceRequirement struct {

// PDStatus is PD status
type PDStatus struct {
Synced bool `json:"synced,omitempty"`
Phase MemberPhase `json:"phase,omitempty"`
StatefulSet *apps.StatefulSetStatus `json:"statefulSet,omitempty"`
Members map[string]PDMember `json:"members,omitempty"`
Leader PDMember `json:"leader,omitempty"`
FailureMembers map[string]PDFailureMember `json:"failureMembers,omitempty"`
}

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/pingcap.com/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 71 additions & 19 deletions pkg/controller/pd_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,22 @@ type PDClient interface {
BeginEvictLeader(storeID uint64) error
// EndEvictLeader is used at the end of pod upgrade.
EndEvictLeader(storeID uint64) error
// GetPDLeader returns pd leader
GetPDLeader() (*pdpb.Member, error)
// TransferPDLeader transfers pd leader to specified member
TransferPDLeader(name string) error
}

var (
healthPrefix = "pd/health"
membersPrefix = "pd/api/v1/members"
storesPrefix = "pd/api/v1/stores"
storePrefix = "pd/api/v1/store"
configPrefix = "pd/api/v1/config"
clusterIDPrefix = "pd/api/v1/cluster"
schedulersPrefix = "pd/api/v1/schedulers"
healthPrefix = "pd/health"
membersPrefix = "pd/api/v1/members"
storesPrefix = "pd/api/v1/stores"
storePrefix = "pd/api/v1/store"
configPrefix = "pd/api/v1/config"
clusterIDPrefix = "pd/api/v1/cluster"
schedulersPrefix = "pd/api/v1/schedulers"
pdLeaderPrefix = "pd/api/v1/leader"
pdLeaderTransferPrefix = "pd/api/v1/leader/transfer"
)

// pdClient is default implementation of PDClient
Expand Down Expand Up @@ -323,8 +329,7 @@ func (pc *pdClient) DeleteStore(storeID uint64) error {
return nil
}

err = fmt.Errorf("failed to delete store %d: %v", storeID, string(body))
return err
return fmt.Errorf("failed to delete store %d: %v", storeID, string(body))
}

func (pc *pdClient) DeleteMemberByID(memberID uint64) error {
Expand All @@ -342,8 +347,7 @@ func (pc *pdClient) DeleteMemberByID(memberID uint64) error {
return nil
}
err2 := readErrorBody(res.Body)
err = fmt.Errorf("failed %v to delete member %d: %v", res.StatusCode, memberID, err2)
return err
return fmt.Errorf("failed %v to delete member %d: %v", res.StatusCode, memberID, err2)
}

func (pc *pdClient) DeleteMember(name string) error {
Expand All @@ -361,8 +365,7 @@ func (pc *pdClient) DeleteMember(name string) error {
return nil
}
err2 := readErrorBody(res.Body)
err = fmt.Errorf("failed %v to delete member %s: %v", res.StatusCode, name, err2)
return err
return fmt.Errorf("failed %v to delete member %s: %v", res.StatusCode, name, err2)
}

func (pc *pdClient) SetStoreLabels(storeID uint64, labels map[string]string) (bool, error) {
Expand All @@ -380,8 +383,7 @@ func (pc *pdClient) SetStoreLabels(storeID uint64, labels map[string]string) (bo
return true, nil
}
err2 := readErrorBody(res.Body)
err = fmt.Errorf("failed %v to set store labels: %v", res.StatusCode, err2)
return false, err
return false, fmt.Errorf("failed %v to set store labels: %v", res.StatusCode, err2)
}

func (pc *pdClient) BeginEvictLeader(storeID uint64) error {
Expand All @@ -400,8 +402,7 @@ func (pc *pdClient) BeginEvictLeader(storeID uint64) error {
return nil
}
err2 := readErrorBody(res.Body)
err = fmt.Errorf("failed %v to begin evict leader of store:[%d],error: %v", res.StatusCode, storeID, err2)
return err
return fmt.Errorf("failed %v to begin evict leader of store:[%d],error: %v", res.StatusCode, storeID, err2)
}

func (pc *pdClient) EndEvictLeader(storeID uint64) error {
Expand All @@ -419,8 +420,39 @@ func (pc *pdClient) EndEvictLeader(storeID uint64) error {
return nil
}
err2 := readErrorBody(res.Body)
err = fmt.Errorf("failed %v to end leader evict scheduler of store [%d],error:%v", res.StatusCode, storeID, err2)
return err
return fmt.Errorf("failed %v to end leader evict scheduler of store [%d],error:%v", res.StatusCode, storeID, err2)
}

func (pc *pdClient) GetPDLeader() (*pdpb.Member, error) {
apiURL := fmt.Sprintf("%s/%s", pc.url, pdLeaderPrefix)
body, err := pc.getBodyOK(apiURL)
if err != nil {
return nil, err
}
leader := &pdpb.Member{}
err = json.Unmarshal(body, leader)
if err != nil {
return nil, err
}
return leader, nil
}

func (pc *pdClient) TransferPDLeader(memberName string) error {
apiURL := fmt.Sprintf("%s/%s/%s", pc.url, pdLeaderTransferPrefix, memberName)
req, err := http.NewRequest("POST", apiURL, nil)
if err != nil {
return err
}
res, err := pc.httpClient.Do(req)
if err != nil {
return err
}
defer DeferClose(res.Body, &err)
if res.StatusCode == http.StatusOK || res.StatusCode == http.StatusNotFound {
return nil
}
err2 := readErrorBody(res.Body)
return fmt.Errorf("failed %v to transfer pd leader to %s,error: %v", res.StatusCode, memberName, err2)
}

func (pc *pdClient) getBodyOK(apiURL string) ([]byte, error) {
Expand Down Expand Up @@ -497,6 +529,8 @@ const (
SetStoreLabelsActionType ActionType = "SetStoreLabels"
BeginEvictLeaderActionType ActionType = "BeginEvictLeader"
EndEvictLeaderActionType ActionType = "EndEvictLeader"
GetPDLeaderActionType ActionType = "GetPDLeader"
TransferPDLeaderActionType ActionType = "TransferPDLeader"
)

type NotFoundReaction struct {
Expand Down Expand Up @@ -658,3 +692,21 @@ func (pc *FakePDClient) EndEvictLeader(storeID uint64) error {
}
return nil
}

func (pc *FakePDClient) GetPDLeader() (*pdpb.Member, error) {
if reaction, ok := pc.reactions[GetPDLeaderActionType]; ok {
action := &Action{}
result, err := reaction(action)
return result.(*pdpb.Member), err
}
return nil, nil
}

func (pc *FakePDClient) TransferPDLeader(memberName string) error {
if reaction, ok := pc.reactions[TransferPDLeaderActionType]; ok {
action := &Action{Name: memberName}
_, err := reaction(action)
return err
}
return nil
}
2 changes: 1 addition & 1 deletion pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewController(
pdScaler := mm.NewPDScaler(pdControl, pvcInformer.Lister(), pvcControl)
tikvScaler := mm.NewTiKVScaler(pdControl, pvcInformer.Lister(), pvcControl)
pdFailover := mm.NewPDFailover(cli, pdControl, pdFailoverPeriod, podInformer.Lister(), podControl, pvcInformer.Lister(), pvcControl, pvInformer.Lister())
pdUpgrader := mm.NewPDUpgrader()
pdUpgrader := mm.NewPDUpgrader(pdControl, podControl, podInformer.Lister())
tikvFailover := mm.NewTiKVFailover(pdControl)
tikvUpgrader := mm.NewTiKVUpgrader()
tidbUpgrader := mm.NewTiDBUpgrader()
Expand Down
8 changes: 4 additions & 4 deletions pkg/controller/tidbcluster/tidb_cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,12 @@ func newFakeTidbClusterController() (*Controller, cache.Indexer, cache.Indexer)
podControl := controller.NewRealPodControl(kubeCli, pdControl, podInformer.Lister(), recorder)
pdScaler := mm.NewPDScaler(pdControl, pvcInformer.Lister(), pvcControl)
tikvScaler := mm.NewTiKVScaler(pdControl, pvcInformer.Lister(), pvcControl)
tikvFailover := mm.NewTiKVFailover(pdControl)
pdFailover := mm.NewFakePDFailover()
pdUpgrader := mm.NewPDUpgrader()
tikvUpgrader := mm.NewTiKVUpgrader()
tidbUpgrader := mm.NewTiDBUpgrader()
tikvFailover := mm.NewTiKVFailover(pdControl)
tidbFailover := mm.NewFakeTiDBFailover()
pdUpgrader := mm.NewFakePDUpgrader()
tikvUpgrader := mm.NewFakeTiKVUpgrader()
tidbUpgrader := mm.NewFakeTiDBUpgrader()

tcc.control = NewDefaultTidbClusterControl(
controller.NewRealTidbClusterControl(cli, tcInformer.Lister(), recorder),
Expand Down
87 changes: 60 additions & 27 deletions pkg/manager/member/pd_failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,52 @@ func NewPDFailover(cli versioned.Interface,
pvLister}
}

func (pf *pdFailover) cleanOrphanPods(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()

for _, pdMember := range tc.Status.PD.FailureMembers {
if !pdMember.MemberDeleted {
continue
}

podName := pdMember.PodName
ordinal, err := getOrdinalFromPodName(podName)
if err != nil {
return err
}
pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal)
_, err = pf.pvcLister.PersistentVolumeClaims(ns).Get(pvcName)
if errors.IsNotFound(err) {
pod, err := pf.podLister.Pods(ns).Get(podName)
if err == nil {
err = pf.podControl.DeletePod(tc, pod)
if err != nil {
return err
}
}
}
}

return nil
}

func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
ns := tc.GetNamespace()
tcName := tc.GetName()

if !tc.Status.PD.Synced {
return fmt.Errorf("TidbCluster: %s/%s's pd status sync failed, can't failover", ns, tcName)
}

if tc.Status.PD.FailureMembers == nil {
tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{}
}
err := pf.cleanOrphanPods(tc)
if err != nil {
return err
}

healthCount := 0
for _, pdMember := range tc.Status.PD.Members {
if pdMember.Health {
Expand All @@ -74,9 +116,6 @@ func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
ns, tcName, healthCount, tc.PDRealReplicas(), tc.Spec.PD.Replicas, len(tc.Status.PD.FailureMembers))
}

if tc.Status.PD.FailureMembers == nil {
tc.Status.PD.FailureMembers = map[string]v1alpha1.PDFailureMember{}
}
notDeletedCount := 0
for _, pdMember := range tc.Status.PD.FailureMembers {
if !pdMember.MemberDeleted {
Expand Down Expand Up @@ -117,7 +156,7 @@ func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
}

// invoke deleteMember api to delete a member from the pd cluster
err := pf.pdControl.GetPDClient(tc).DeleteMember(failureMember.PodName)
err = pf.pdControl.GetPDClient(tc).DeleteMember(failureMember.PodName)
if err != nil {
return err
}
Expand All @@ -137,38 +176,32 @@ func (pf *pdFailover) Failover(tc *v1alpha1.TidbCluster) error {
}
pvcName := ordinalPVCName(v1alpha1.PDMemberType, controller.PDMemberName(tcName), ordinal)
pvc, err := pf.pvcLister.PersistentVolumeClaims(ns).Get(pvcName)
if errors.IsNotFound(err) {
if pod != nil && pod.DeletionTimestamp == nil {
err := pf.podControl.DeletePod(tc, pod)
if err != nil {
return err
}
}
setMemberDeleted(tc, failurePodName)
return nil
}
if err != nil {
return err
}
pv, err := pf.pvLister.Get(pvc.Spec.VolumeName)
if errors.IsNotFound(err) {
setMemberDeleted(tc, failurePodName)
return nil
}
if err != nil {
if err != nil && !errors.IsNotFound(err) {
return err
}
if string(pv.UID) != string(failureMember.PVUID) {
setMemberDeleted(tc, failurePodName)
return nil

if pvc != nil {
pv, err := pf.pvLister.Get(pvc.Spec.VolumeName)
if errors.IsNotFound(err) {
setMemberDeleted(tc, failurePodName)
return nil
}
if err != nil {
return err
}
if string(pv.UID) != string(failureMember.PVUID) {
setMemberDeleted(tc, failurePodName)
return nil
}
}

if pod != nil && pod.DeletionTimestamp == nil {
err := pf.podControl.DeletePod(tc, pod)
if err != nil {
return err
}
}
if pvc.DeletionTimestamp == nil {
if pvc != nil && pvc.DeletionTimestamp == nil {
err = pf.pvcControl.DeletePVC(tc, pvc)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 44fff7a

Please sign in to comment.