Skip to content

Commit

Permalink
don't move on to failover until the cluster is initialized
Browse files Browse the repository at this point in the history
This only works on newer versions of TiDB
  • Loading branch information
gregwebs committed Jul 24, 2019
1 parent 5151091 commit e50a407
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 25 deletions.
11 changes: 1 addition & 10 deletions cmd/wait-for-pd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ func pdHasLeader(pdClient pdapi.PDClient) (bool, error) {
return memberInfo != nil, nil
}

func pdIsInitialized(pdClient pdapi.PDClient) (*bool, error) {
status, err := pdClient.GetClusterStatus()
if err != nil {
f := false
return &f, err
}
return status.IsInitialized, nil
}

// On older versions this will return the empty semver version 0.0.0
// Most tidb-operator users will be using version 3.0+ which has the version field.
func pdVersion(pdClient pdapi.PDClient) (string, error) {
Expand Down Expand Up @@ -113,7 +104,7 @@ func main() {
}

waitForInitializationFunc := func() bool {
isInit, err := pdIsInitialized(pdClient)
isInit, err := pdClient.GetClusterInitialized()
if err != nil {
glog.Infof("Error using pdClient to get cluster status %v", err)
} else if isInit == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func NewController(
setControl,
svcControl,
tidbControl,
pdControl,
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
Expand Down
11 changes: 9 additions & 2 deletions pkg/manager/member/tidb_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type tidbMemberManager struct {
setControl controller.StatefulSetControlInterface
svcControl controller.ServiceControlInterface
tidbControl controller.TiDBControlInterface
pdControl pdapi.PDControlInterface
setLister v1beta1.StatefulSetLister
svcLister corelisters.ServiceLister
podLister corelisters.PodLister
Expand All @@ -57,6 +58,7 @@ type tidbMemberManager struct {
func NewTiDBMemberManager(setControl controller.StatefulSetControlInterface,
svcControl controller.ServiceControlInterface,
tidbControl controller.TiDBControlInterface,
pdControl pdapi.PDControlInterface,
setLister v1beta1.StatefulSetLister,
svcLister corelisters.ServiceLister,
podLister corelisters.PodLister,
Expand All @@ -68,6 +70,7 @@ func NewTiDBMemberManager(setControl controller.StatefulSetControlInterface,
setControl: setControl,
svcControl: svcControl,
tidbControl: tidbControl,
pdControl: pdControl,
setLister: setLister,
svcLister: svcLister,
podLister: podLister,
Expand Down Expand Up @@ -151,8 +154,12 @@ func (tmm *tidbMemberManager) syncTiDBStatefulSetForTidbCluster(tc *v1alpha1.Tid
tc.Status.TiDB.StatefulSet = &apps.StatefulSetStatus{}
return nil
}

if !tc.TiKVIsAvailable() {
initialized, err := controller.GetPDClient(tmm.pdControl, tc).GetClusterInitialized()
if err != nil {
return err
}
// initialized is nil in TiDB versions (< 3.0)
if (initialized != nil && !*initialized) || (initialized == nil && !tc.TiKVIsAvailable()) {
return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for TiKV cluster running", ns, tcName)
}

Expand Down
20 changes: 14 additions & 6 deletions pkg/manager/member/tidb_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/pdapi"
apps "k8s.io/api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestTiDBMemberManagerSyncCreate(t *testing.T) {
test.prepare(tc)
}

tmm, fakeSetControl, _, _ := newFakeTiDBMemberManager()
tmm, fakeSetControl, _, _, _ := newFakeTiDBMemberManager()

if test.errWhenCreateStatefulSet {
fakeSetControl.SetCreateStatefulSetError(errors.NewInternalError(fmt.Errorf("API server failed")), 0)
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestTiDBMemberManagerSyncUpdate(t *testing.T) {
ns := tc.GetNamespace()
tcName := tc.GetName()

tmm, fakeSetControl, _, _ := newFakeTiDBMemberManager()
tmm, fakeSetControl, _, _, fakePDControl := newFakeTiDBMemberManager()

if test.statusChange == nil {
fakeSetControl.SetStatusChange(func(set *apps.StatefulSet) {
Expand Down Expand Up @@ -168,6 +169,11 @@ func TestTiDBMemberManagerSyncUpdate(t *testing.T) {
fakeSetControl.SetUpdateStatefulSetError(errors.NewInternalError(fmt.Errorf("API server failed")), 0)
}

pdClient := controller.NewFakePDClient(fakePDControl, tc)
pdClient.AddReaction(pdapi.GetClusterInitializedActionType, func(action *pdapi.Action) (interface{}, error) {
return func() *bool { return nil }(), nil
})

err = tmm.Sync(tc1)
if test.err {
g.Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -239,7 +245,7 @@ func TestTiDBMemberManagerTiDBStatefulSetIsUpgrading(t *testing.T) {
expectUpgrading bool
}
testFn := func(test *testcase, t *testing.T) {
pmm, _, podIndexer, _ := newFakeTiDBMemberManager()
pmm, _, podIndexer, _, _ := newFakeTiDBMemberManager()
tc := newTidbClusterForTiDB()
tc.Status.TiDB.StatefulSet = &apps.StatefulSetStatus{
UpdateRevision: "v3",
Expand Down Expand Up @@ -352,7 +358,7 @@ func TestTiDBMemberManagerSyncTidbClusterStatus(t *testing.T) {
if test.updateTC != nil {
test.updateTC(tc)
}
pmm, _, _, tidbControl := newFakeTiDBMemberManager()
pmm, _, _, tidbControl, _ := newFakeTiDBMemberManager()

if test.upgradingFn != nil {
pmm.tidbStatefulSetIsUpgradingFn = test.upgradingFn
Expand Down Expand Up @@ -521,7 +527,7 @@ func TestTiDBMemberManagerSyncTidbClusterStatus(t *testing.T) {
}
}

func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSetControl, cache.Indexer, *controller.FakeTiDBControl) {
func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSetControl, cache.Indexer, *controller.FakeTiDBControl, *pdapi.FakePDControl) {
cli := fake.NewSimpleClientset()
kubeCli := kubefake.NewSimpleClientset()
setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets()
Expand All @@ -534,12 +540,14 @@ func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSet
tidbUpgrader := NewFakeTiDBUpgrader()
tidbFailover := NewFakeTiDBFailover()
tidbControl := controller.NewFakeTiDBControl()
pdControl := pdapi.NewFakePDControl()
operatorImage := "pingcap/tidb-operator:latest"

tmm := &tidbMemberManager{
setControl,
svcControl,
tidbControl,
pdControl,
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
Expand All @@ -549,7 +557,7 @@ func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSet
tidbFailover,
tidbStatefulSetIsUpgrading,
}
return tmm, setControl, podInformer.Informer().GetIndexer(), tidbControl
return tmm, setControl, podInformer.Informer().GetIndexer(), tidbControl, pdControl
}

func newTidbClusterForTiDB() *v1alpha1.TidbCluster {
Expand Down
23 changes: 16 additions & 7 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type PDClient interface {
GetConfig() (*server.Config, error)
// GetCluster returns used when syncing pod labels.
GetCluster() (*metapb.Cluster, error)
// GetClusterStatus is used to determine if the cluster is initialized
GetClusterStatus() (*ClusterStatus, error)
// GetClusterInitialized is used to determine if the cluster is initialized
GetClusterInitialized() (*bool, error)
// GetMembers returns all PD members from cluster
GetMembers() (*MembersInfo, error)
// GetStores lists all TiKV stores from cluster
Expand Down Expand Up @@ -250,7 +250,7 @@ func (pc *pdClient) GetConfig() (*server.Config, error) {
return config, nil
}

func (pc *pdClient) GetClusterStatus() (*ClusterStatus, error) {
func (pc *pdClient) getClusterStatus() (*ClusterStatus, error) {
apiURL := fmt.Sprintf("%s/%s", pc.url, clusterStatusPrefix)
body, err := httputil.GetBodyOK(pc.httpClient, apiURL)
if err != nil {
Expand All @@ -264,6 +264,15 @@ func (pc *pdClient) GetClusterStatus() (*ClusterStatus, error) {
return clusterStatus, nil
}

func (pc *pdClient) GetClusterInitialized() (*bool, error) {
status, err := pc.getClusterStatus()
if err != nil {
f := false
return &f, err
}
return status.IsInitialized, nil
}

func (pc *pdClient) GetCluster() (*metapb.Cluster, error) {
apiURL := fmt.Sprintf("%s/%s", pc.url, clusterIDPrefix)
body, err := httputil.GetBodyOK(pc.httpClient, apiURL)
Expand Down Expand Up @@ -630,7 +639,7 @@ const (
GetHealthActionType ActionType = "GetHealth"
GetConfigActionType ActionType = "GetConfig"
GetClusterActionType ActionType = "GetCluster"
GetClusterStatusActionType ActionType = "GetClusterStatus"
GetClusterInitializedActionType ActionType = "GetClusterInitialized"
GetMembersActionType ActionType = "GetMembers"
GetStoresActionType ActionType = "GetStores"
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
Expand Down Expand Up @@ -713,13 +722,13 @@ func (pc *FakePDClient) GetCluster() (*metapb.Cluster, error) {
return result.(*metapb.Cluster), nil
}

func (pc *FakePDClient) GetClusterStatus() (*ClusterStatus, error) {
func (pc *FakePDClient) GetClusterInitialized() (*bool, error) {
action := &Action{}
result, err := pc.fakeAPI(GetClusterStatusActionType, action)
result, err := pc.fakeAPI(GetClusterInitializedActionType, action)
if err != nil {
return nil, err
}
return result.(*ClusterStatus), nil
return result.(*bool), nil
}

func (pc *FakePDClient) GetMembers() (*MembersInfo, error) {
Expand Down

0 comments on commit e50a407

Please sign in to comment.