diff --git a/cmd/wait-for-pd/main.go b/cmd/wait-for-pd/main.go index b1e5cf46b4..1de06976a1 100644 --- a/cmd/wait-for-pd/main.go +++ b/cmd/wait-for-pd/main.go @@ -16,9 +16,11 @@ package main import ( "github.com/golang/glog" "github.com/pingcap/tidb-operator/pkg/pdapi" + "github.com/pingcap/tidb-operator/version" "k8s.io/apiserver/pkg/util/logs" "flag" + "fmt" "os" "time" ) @@ -27,12 +29,55 @@ const ( timeout = 5 * time.Second ) +var ( + printVersion bool + waitForInitialization bool + waitForLeader bool +) + func init() { + flag.BoolVar(&printVersion, "V", false, "Show version and quit") + flag.BoolVar(&printVersion, "version", false, "Show version and quit") + flag.BoolVar(&waitForInitialization, pdapi.WaitForInitializationFlag, false, "Wait for initialization of the cluster. This means all replicas have come online") + flag.BoolVar(&waitForLeader, pdapi.WaitForLeaderFlag, false, "Wait for just the presence of a PD leader.") flag.Parse() } +func pdHasLeader(pdClient pdapi.PDClient) (bool, error) { + memberInfo, err := pdClient.GetPDLeader() + if err != nil { + return false, err + } + return memberInfo != nil, nil +} + +func pdIsInitialized(pdClient pdapi.PDClient) (*bool, error) { + status, err := pdClient.GetClusterStatus() + if err != nil { + f := false + return &f, err + } + return status.IsInitialized, nil +} + +// On older versions this will return the empty semver version 0.0.0 +// Most tidb-operator users will be using version 3.0+ which has the version field. +func pdVersion(pdClient pdapi.PDClient) (string, error) { + conf, err := pdClient.GetConfig() + if err != nil { + return "", err + } + return fmt.Sprintf("%s", conf.ClusterVersion), nil +} + // wait-for-pd waits for 1 PD to be running func main() { + if printVersion { + version.PrintVersionInfo() + os.Exit(0) + } + version.LogVersionInfo() + logs.InitLogs() defer logs.FlushLogs() @@ -52,12 +97,51 @@ func main() { pdClient := pdapi.NewPDClient(pdapi.PdClientURL(pdapi.Namespace(namespace), tcName), timeout) - for { - membersInfo, err := pdClient.GetMembers() + var waitFunction func() bool + + waitForLeaderFunc := func() bool { + hasLeader, err := pdHasLeader(pdClient) if err != nil { - glog.Errorf("Error using pdClient to get members %v", err) - } else if membersInfo.Leader != nil { + glog.Infof("Error using pdClient to get members %v", err) + } else if hasLeader { glog.Infof("Found a PD member. Exiting now.") + return true + } else { + glog.Infof("PD Leader not found") + } + return false + } + + waitForInitializationFunc := func() bool { + isInit, err := pdIsInitialized(pdClient) + if err != nil { + glog.Infof("Error using pdClient to get cluster status %v", err) + } else if isInit == nil { + version, verr := pdVersion(pdClient) + if verr != nil || version == "" { + glog.Errorf("Error using pdClient to get cluster version %v", verr) + } + glog.Warningf("For this PD version %s the cluster status API does not support is_initialized. Will now wait for just a PD leader", version) + waitFunction = waitForLeaderFunc + } else if *isInit { + glog.Infof("Cluster is inititialized. Exiting now.") + return true + } else { + glog.Infof("PD is not initialized") + } + return false + } + + if waitForInitialization { + waitFunction = waitForInitializationFunc + } else if waitForLeader { + waitFunction = waitForLeaderFunc + } else { + glog.Fatalf("Expected either the flag --initialization or --leader") + } + + for { + if waitFunction() { break } time.Sleep(oneSecond) diff --git a/pkg/manager/member/member.go b/pkg/manager/member/member.go index a3100da3d6..4304a7ec38 100644 --- a/pkg/manager/member/member.go +++ b/pkg/manager/member/member.go @@ -5,7 +5,7 @@ import ( ) // WaitForPDContainer gives the container spec for the wait-for-pd init container -func WaitForPDContainer(tcName string, operatorImage string) corev1.Container { +func WaitForPDContainer(tcName string, operatorImage string, arguments []string) corev1.Container { initEnvs := []corev1.EnvVar{ { Name: "NAMESPACE", @@ -24,7 +24,7 @@ func WaitForPDContainer(tcName string, operatorImage string) corev1.Container { return corev1.Container{ Name: "wait-for-pd", Image: operatorImage, - Command: []string{"wait-for-pd"}, + Command: append([]string{"wait-for-pd"}, arguments...), Env: initEnvs, } } diff --git a/pkg/manager/member/tidb_member_manager.go b/pkg/manager/member/tidb_member_manager.go index 35c1006604..e05d0fd5b8 100644 --- a/pkg/manager/member/tidb_member_manager.go +++ b/pkg/manager/member/tidb_member_manager.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb-operator/pkg/controller" "github.com/pingcap/tidb-operator/pkg/label" "github.com/pingcap/tidb-operator/pkg/manager" + "github.com/pingcap/tidb-operator/pkg/pdapi" "github.com/pingcap/tidb-operator/pkg/util" apps "k8s.io/api/apps/v1beta1" corev1 "k8s.io/api/core/v1" @@ -358,7 +359,7 @@ func (tmm *tidbMemberManager) getNewTiDBSetForTidbCluster(tc *v1alpha1.TidbClust SchedulerName: tc.Spec.SchedulerName, Affinity: tc.Spec.TiDB.Affinity, NodeSelector: tc.Spec.TiDB.NodeSelector, - InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tmm.operatorImage)}, + InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tmm.operatorImage, []string{"-" + pdapi.WaitForInitializationFlag})}, Containers: containers, RestartPolicy: corev1.RestartPolicyAlways, Tolerations: tc.Spec.TiDB.Tolerations, diff --git a/pkg/manager/member/tikv_member_manager.go b/pkg/manager/member/tikv_member_manager.go index 8863d548fc..0a23fdc64d 100644 --- a/pkg/manager/member/tikv_member_manager.go +++ b/pkg/manager/member/tikv_member_manager.go @@ -350,7 +350,7 @@ func (tkmm *tikvMemberManager) getNewSetForTidbCluster(tc *v1alpha1.TidbCluster) SchedulerName: tc.Spec.SchedulerName, Affinity: tc.Spec.TiKV.Affinity, NodeSelector: tc.Spec.TiKV.NodeSelector, - InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tkmm.operatorImage)}, + InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tkmm.operatorImage, []string{"-" + pdapi.WaitForLeaderFlag})}, Containers: []corev1.Container{ { Name: v1alpha1.TiKVMemberType.String(), diff --git a/pkg/pdapi/pdapi.go b/pkg/pdapi/pdapi.go index 51b4fe7ee1..cf8a1f1bda 100644 --- a/pkg/pdapi/pdapi.go +++ b/pkg/pdapi/pdapi.go @@ -32,6 +32,11 @@ import ( const ( timeout = 5 * time.Second + + // WaitForInitializationFlag is located here only because it is convenient + WaitForInitializationFlag = "initialization" + // WaitForLeaderFlag is located here only because it is convenient + WaitForLeaderFlag = "leader" ) // Namespace is a newtype of a string @@ -83,6 +88,8 @@ type PDClient interface { GetConfig() (*server.Config, error) // GetCluster returns used when syncing pod labels. GetCluster() (*metapb.Cluster, error) + // GetClusterStatus is used to determine if the cluster is initialized + GetClusterStatus() (*ClusterStatus, error) // GetMembers returns all PD members from cluster GetMembers() (*MembersInfo, error) // GetStores lists all TiKV stores from cluster @@ -120,6 +127,7 @@ var ( storePrefix = "pd/api/v1/store" configPrefix = "pd/api/v1/config" clusterIDPrefix = "pd/api/v1/cluster" + clusterStatusPrefix = "pd/api/v1/cluster/status" schedulersPrefix = "pd/api/v1/schedulers" pdLeaderPrefix = "pd/api/v1/leader" pdLeaderTransferPrefix = "pd/api/v1/leader/transfer" @@ -139,6 +147,13 @@ func NewPDClient(url string, timeout time.Duration) PDClient { } } +// ClusterStatus is the same as server.ClusterStatus except that IsInitialized is nullable. +// This allows for backwards compatibility testing. +type ClusterStatus struct { + RaftBootstrapTime time.Time `json:"raft_bootstrap_time,omitempty"` + IsInitialized *bool `json:"is_initialized"` +} + // following struct definitions are copied from github.com/pingcap/pd/server/api/store // these are not exported by that package @@ -233,6 +248,20 @@ func (pc *pdClient) GetConfig() (*server.Config, error) { return config, nil } +func (pc *pdClient) GetClusterStatus() (*ClusterStatus, error) { + apiURL := fmt.Sprintf("%s/%s", pc.url, clusterStatusPrefix) + body, err := httputil.GetBodyOK(pc.httpClient, apiURL) + if err != nil { + return nil, err + } + clusterStatus := &ClusterStatus{} + err = json.Unmarshal(body, clusterStatus) + if err != nil { + return nil, err + } + return clusterStatus, nil +} + func (pc *pdClient) GetCluster() (*metapb.Cluster, error) { apiURL := fmt.Sprintf("%s/%s", pc.url, clusterIDPrefix) body, err := httputil.GetBodyOK(pc.httpClient, apiURL) @@ -594,6 +623,7 @@ const ( GetHealthActionType ActionType = "GetHealth" GetConfigActionType ActionType = "GetConfig" GetClusterActionType ActionType = "GetCluster" + GetClusterStatusActionType ActionType = "GetClusterStatus" GetMembersActionType ActionType = "GetMembers" GetStoresActionType ActionType = "GetStores" GetTombStoneStoresActionType ActionType = "GetTombStoneStores" @@ -676,6 +706,15 @@ func (pc *FakePDClient) GetCluster() (*metapb.Cluster, error) { return result.(*metapb.Cluster), nil } +func (pc *FakePDClient) GetClusterStatus() (*ClusterStatus, error) { + action := &Action{} + result, err := pc.fakeAPI(GetClusterStatusActionType, action) + if err != nil { + return nil, err + } + return result.(*ClusterStatus), nil +} + func (pc *FakePDClient) GetMembers() (*MembersInfo, error) { action := &Action{} result, err := pc.fakeAPI(GetMembersActionType, action)