Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

create all statefulsets immediately #581

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions cmd/wait-for-pd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,6 @@ func pdHasLeader(pdClient pdapi.PDClient) (bool, error) {
return memberInfo != nil, nil
}

func pdIsInitialized(pdClient pdapi.PDClient) (*bool, error) {
status, err := pdClient.GetClusterStatus()
if err != nil {
f := false
return &f, err
}
return status.IsInitialized, nil
}

// On older versions this will return the empty semver version 0.0.0
// Most tidb-operator users will be using version 3.0+ which has the version field.
func pdVersion(pdClient pdapi.PDClient) (string, error) {
Expand Down Expand Up @@ -113,7 +104,7 @@ func main() {
}

waitForInitializationFunc := func() bool {
isInit, err := pdIsInitialized(pdClient)
isInit, err := pdClient.GetClusterInitialized()
if err != nil {
glog.Infof("Error using pdClient to get cluster status %v", err)
} else if isInit == nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/tidbcluster/tidb_cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ func NewController(
setControl,
svcControl,
tidbControl,
pdControl,
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
Expand Down
11 changes: 9 additions & 2 deletions pkg/manager/member/tidb_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type tidbMemberManager struct {
setControl controller.StatefulSetControlInterface
svcControl controller.ServiceControlInterface
tidbControl controller.TiDBControlInterface
pdControl pdapi.PDControlInterface
setLister v1beta1.StatefulSetLister
svcLister corelisters.ServiceLister
podLister corelisters.PodLister
Expand All @@ -57,6 +58,7 @@ type tidbMemberManager struct {
func NewTiDBMemberManager(setControl controller.StatefulSetControlInterface,
svcControl controller.ServiceControlInterface,
tidbControl controller.TiDBControlInterface,
pdControl pdapi.PDControlInterface,
setLister v1beta1.StatefulSetLister,
svcLister corelisters.ServiceLister,
podLister corelisters.PodLister,
Expand All @@ -68,6 +70,7 @@ func NewTiDBMemberManager(setControl controller.StatefulSetControlInterface,
setControl: setControl,
svcControl: svcControl,
tidbControl: tidbControl,
pdControl: pdControl,
setLister: setLister,
svcLister: svcLister,
podLister: podLister,
Expand Down Expand Up @@ -151,8 +154,12 @@ func (tmm *tidbMemberManager) syncTiDBStatefulSetForTidbCluster(tc *v1alpha1.Tid
tc.Status.TiDB.StatefulSet = &apps.StatefulSetStatus{}
return nil
}

if !tc.TiKVIsAvailable() {
initialized, err := controller.GetPDClient(tmm.pdControl, tc).GetClusterInitialized()
gregwebs marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}
// initialized is nil in TiDB versions (< 3.0)
if (initialized != nil && !*initialized) || (initialized == nil && !tc.TiKVIsAvailable()) {
return controller.RequeueErrorf("TidbCluster: [%s/%s], waiting for TiKV cluster running", ns, tcName)
}

Expand Down
20 changes: 14 additions & 6 deletions pkg/manager/member/tidb_member_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
informers "github.com/pingcap/tidb-operator/pkg/client/informers/externalversions"
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/pdapi"
apps "k8s.io/api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -62,7 +63,7 @@ func TestTiDBMemberManagerSyncCreate(t *testing.T) {
test.prepare(tc)
}

tmm, fakeSetControl, _, _ := newFakeTiDBMemberManager()
tmm, fakeSetControl, _, _, _ := newFakeTiDBMemberManager()

if test.errWhenCreateStatefulSet {
fakeSetControl.SetCreateStatefulSetError(errors.NewInternalError(fmt.Errorf("API server failed")), 0)
Expand Down Expand Up @@ -140,7 +141,7 @@ func TestTiDBMemberManagerSyncUpdate(t *testing.T) {
ns := tc.GetNamespace()
tcName := tc.GetName()

tmm, fakeSetControl, _, _ := newFakeTiDBMemberManager()
tmm, fakeSetControl, _, _, fakePDControl := newFakeTiDBMemberManager()

if test.statusChange == nil {
fakeSetControl.SetStatusChange(func(set *apps.StatefulSet) {
Expand Down Expand Up @@ -168,6 +169,11 @@ func TestTiDBMemberManagerSyncUpdate(t *testing.T) {
fakeSetControl.SetUpdateStatefulSetError(errors.NewInternalError(fmt.Errorf("API server failed")), 0)
}

pdClient := controller.NewFakePDClient(fakePDControl, tc)
pdClient.AddReaction(pdapi.GetClusterInitializedActionType, func(action *pdapi.Action) (interface{}, error) {
return func() *bool { return nil }(), nil
})

err = tmm.Sync(tc1)
if test.err {
g.Expect(err).To(HaveOccurred())
Expand Down Expand Up @@ -239,7 +245,7 @@ func TestTiDBMemberManagerTiDBStatefulSetIsUpgrading(t *testing.T) {
expectUpgrading bool
}
testFn := func(test *testcase, t *testing.T) {
pmm, _, podIndexer, _ := newFakeTiDBMemberManager()
pmm, _, podIndexer, _, _ := newFakeTiDBMemberManager()
tc := newTidbClusterForTiDB()
tc.Status.TiDB.StatefulSet = &apps.StatefulSetStatus{
UpdateRevision: "v3",
Expand Down Expand Up @@ -352,7 +358,7 @@ func TestTiDBMemberManagerSyncTidbClusterStatus(t *testing.T) {
if test.updateTC != nil {
test.updateTC(tc)
}
pmm, _, _, tidbControl := newFakeTiDBMemberManager()
pmm, _, _, tidbControl, _ := newFakeTiDBMemberManager()

if test.upgradingFn != nil {
pmm.tidbStatefulSetIsUpgradingFn = test.upgradingFn
Expand Down Expand Up @@ -521,7 +527,7 @@ func TestTiDBMemberManagerSyncTidbClusterStatus(t *testing.T) {
}
}

func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSetControl, cache.Indexer, *controller.FakeTiDBControl) {
func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSetControl, cache.Indexer, *controller.FakeTiDBControl, *pdapi.FakePDControl) {
cli := fake.NewSimpleClientset()
kubeCli := kubefake.NewSimpleClientset()
setInformer := kubeinformers.NewSharedInformerFactory(kubeCli, 0).Apps().V1beta1().StatefulSets()
Expand All @@ -534,12 +540,14 @@ func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSet
tidbUpgrader := NewFakeTiDBUpgrader()
tidbFailover := NewFakeTiDBFailover()
tidbControl := controller.NewFakeTiDBControl()
pdControl := pdapi.NewFakePDControl()
operatorImage := "pingcap/tidb-operator:latest"

tmm := &tidbMemberManager{
setControl,
svcControl,
tidbControl,
pdControl,
setInformer.Lister(),
svcInformer.Lister(),
podInformer.Lister(),
Expand All @@ -549,7 +557,7 @@ func newFakeTiDBMemberManager() (*tidbMemberManager, *controller.FakeStatefulSet
tidbFailover,
tidbStatefulSetIsUpgrading,
}
return tmm, setControl, podInformer.Informer().GetIndexer(), tidbControl
return tmm, setControl, podInformer.Informer().GetIndexer(), tidbControl, pdControl
}

func newTidbClusterForTiDB() *v1alpha1.TidbCluster {
Expand Down
23 changes: 16 additions & 7 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ type PDClient interface {
GetConfig() (*server.Config, error)
// GetCluster returns used when syncing pod labels.
GetCluster() (*metapb.Cluster, error)
// GetClusterStatus is used to determine if the cluster is initialized
GetClusterStatus() (*ClusterStatus, error)
// GetClusterInitialized is used to determine if the cluster is initialized
GetClusterInitialized() (*bool, error)
// GetMembers returns all PD members from cluster
GetMembers() (*MembersInfo, error)
// GetStores lists all TiKV stores from cluster
Expand Down Expand Up @@ -250,7 +250,7 @@ func (pc *pdClient) GetConfig() (*server.Config, error) {
return config, nil
}

func (pc *pdClient) GetClusterStatus() (*ClusterStatus, error) {
func (pc *pdClient) getClusterStatus() (*ClusterStatus, error) {
apiURL := fmt.Sprintf("%s/%s", pc.url, clusterStatusPrefix)
body, err := httputil.GetBodyOK(pc.httpClient, apiURL)
if err != nil {
Expand All @@ -264,6 +264,15 @@ func (pc *pdClient) GetClusterStatus() (*ClusterStatus, error) {
return clusterStatus, nil
}

func (pc *pdClient) GetClusterInitialized() (*bool, error) {
status, err := pc.getClusterStatus()
if err != nil {
f := false
return &f, err
}
return status.IsInitialized, nil
}

func (pc *pdClient) GetCluster() (*metapb.Cluster, error) {
apiURL := fmt.Sprintf("%s/%s", pc.url, clusterIDPrefix)
body, err := httputil.GetBodyOK(pc.httpClient, apiURL)
Expand Down Expand Up @@ -630,7 +639,7 @@ const (
GetHealthActionType ActionType = "GetHealth"
GetConfigActionType ActionType = "GetConfig"
GetClusterActionType ActionType = "GetCluster"
GetClusterStatusActionType ActionType = "GetClusterStatus"
GetClusterInitializedActionType ActionType = "GetClusterInitialized"
GetMembersActionType ActionType = "GetMembers"
GetStoresActionType ActionType = "GetStores"
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
Expand Down Expand Up @@ -713,13 +722,13 @@ func (pc *FakePDClient) GetCluster() (*metapb.Cluster, error) {
return result.(*metapb.Cluster), nil
}

func (pc *FakePDClient) GetClusterStatus() (*ClusterStatus, error) {
func (pc *FakePDClient) GetClusterInitialized() (*bool, error) {
action := &Action{}
result, err := pc.fakeAPI(GetClusterStatusActionType, action)
result, err := pc.fakeAPI(GetClusterInitializedActionType, action)
if err != nil {
return nil, err
}
return result.(*ClusterStatus), nil
return result.(*bool), nil
}

func (pc *FakePDClient) GetMembers() (*MembersInfo, error) {
Expand Down