Skip to content

Commit

Permalink
use the PD cluster status is initialized API
Browse files Browse the repository at this point in the history
  • Loading branch information
gregwebs committed Jul 10, 2019
1 parent 55f49c0 commit 07e3ee4
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 8 deletions.
92 changes: 88 additions & 4 deletions cmd/wait-for-pd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@ package main
import (
"github.com/golang/glog"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/version"
"k8s.io/apiserver/pkg/util/logs"

"flag"
"fmt"
"os"
"time"
)
Expand All @@ -27,12 +29,55 @@ const (
timeout = 5 * time.Second
)

var (
printVersion bool
waitForInitialization bool
waitForLeader bool
)

func init() {
flag.BoolVar(&printVersion, "V", false, "Show version and quit")
flag.BoolVar(&printVersion, "version", false, "Show version and quit")
flag.BoolVar(&waitForInitialization, pdapi.WaitForInitializationFlag, false, "Wait for initialization of the cluster. This means all replicas have come online")
flag.BoolVar(&waitForLeader, pdapi.WaitForLeaderFlag, false, "Wait for just the presence of a PD leader.")
flag.Parse()
}

func pdHasLeader(pdClient pdapi.PDClient) (bool, error) {
memberInfo, err := pdClient.GetPDLeader()
if err != nil {
return false, err
}
return memberInfo != nil, nil
}

func pdIsInitialized(pdClient pdapi.PDClient) (*bool, error) {
status, err := pdClient.GetClusterStatus()
if err != nil {
f := false
return &f, err
}
return status.IsInitialized, nil
}

// On older versions this will return the empty semver version 0.0.0
// Most tidb-operator users will be using version 3.0+ which has the version field.
func pdVersion(pdClient pdapi.PDClient) (string, error) {
conf, err := pdClient.GetConfig()
if err != nil {
return "", err
}
return fmt.Sprintf("%s", conf.ClusterVersion), nil
}

// wait-for-pd waits for 1 PD to be running
func main() {
if printVersion {
version.PrintVersionInfo()
os.Exit(0)
}
version.LogVersionInfo()

logs.InitLogs()
defer logs.FlushLogs()

Expand All @@ -52,12 +97,51 @@ func main() {

pdClient := pdapi.NewPDClient(pdapi.PdClientURL(pdapi.Namespace(namespace), tcName), timeout)

for {
membersInfo, err := pdClient.GetMembers()
var waitFunction func() bool

waitForLeaderFunc := func() bool {
hasLeader, err := pdHasLeader(pdClient)
if err != nil {
glog.Errorf("Error using pdClient to get members %v", err)
} else if membersInfo.Leader != nil {
glog.Infof("Error using pdClient to get members %v", err)
} else if hasLeader {
glog.Infof("Found a PD member. Exiting now.")
return true
} else {
glog.Infof("PD Leader not found")
}
return false
}

waitForInitializationFunc := func() bool {
isInit, err := pdIsInitialized(pdClient)
if err != nil {
glog.Infof("Error using pdClient to get cluster status %v", err)
} else if isInit == nil {
version, verr := pdVersion(pdClient)
if verr != nil || version == "" {
glog.Errorf("Error using pdClient to get cluster version %v", verr)
}
glog.Warningf("For this PD version %s the cluster status API does not support is_initialized. Will now wait for just a PD leader", version)
waitFunction = waitForLeaderFunc
} else if *isInit {
glog.Infof("Cluster is inititialized. Exiting now.")
return true
} else {
glog.Infof("PD is not initialized")
}
return false
}

if waitForInitialization {
waitFunction = waitForInitializationFunc
} else if waitForLeader {
waitFunction = waitForLeaderFunc
} else {
glog.Fatalf("Expected either the flag --initialization or --leader")
}

for {
if waitFunction() {
break
}
time.Sleep(oneSecond)
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/member/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

// WaitForPDContainer gives the container spec for the wait-for-pd init container
func WaitForPDContainer(tcName string, operatorImage string) corev1.Container {
func WaitForPDContainer(tcName string, operatorImage string, arguments []string) corev1.Container {
initEnvs := []corev1.EnvVar{
{
Name: "NAMESPACE",
Expand All @@ -24,7 +24,7 @@ func WaitForPDContainer(tcName string, operatorImage string) corev1.Container {
return corev1.Container{
Name: "wait-for-pd",
Image: operatorImage,
Command: []string{"wait-for-pd"},
Command: append([]string{"wait-for-pd"}, arguments...),
Env: initEnvs,
}
}
3 changes: 2 additions & 1 deletion pkg/manager/member/tidb_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb-operator/pkg/controller"
"github.com/pingcap/tidb-operator/pkg/label"
"github.com/pingcap/tidb-operator/pkg/manager"
"github.com/pingcap/tidb-operator/pkg/pdapi"
"github.com/pingcap/tidb-operator/pkg/util"
apps "k8s.io/api/apps/v1beta1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -358,7 +359,7 @@ func (tmm *tidbMemberManager) getNewTiDBSetForTidbCluster(tc *v1alpha1.TidbClust
SchedulerName: tc.Spec.SchedulerName,
Affinity: tc.Spec.TiDB.Affinity,
NodeSelector: tc.Spec.TiDB.NodeSelector,
InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tmm.operatorImage)},
InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tmm.operatorImage, []string{"-" + pdapi.WaitForInitializationFlag})},
Containers: containers,
RestartPolicy: corev1.RestartPolicyAlways,
Tolerations: tc.Spec.TiDB.Tolerations,
Expand Down
2 changes: 1 addition & 1 deletion pkg/manager/member/tikv_member_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func (tkmm *tikvMemberManager) getNewSetForTidbCluster(tc *v1alpha1.TidbCluster)
SchedulerName: tc.Spec.SchedulerName,
Affinity: tc.Spec.TiKV.Affinity,
NodeSelector: tc.Spec.TiKV.NodeSelector,
InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tkmm.operatorImage)},
InitContainers: []corev1.Container{WaitForPDContainer(tc.GetName(), tkmm.operatorImage, []string{"-" + pdapi.WaitForLeaderFlag})},
Containers: []corev1.Container{
{
Name: v1alpha1.TiKVMemberType.String(),
Expand Down
39 changes: 39 additions & 0 deletions pkg/pdapi/pdapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (

const (
timeout = 5 * time.Second

// WaitForInitializationFlag is located here only because it is convenient
WaitForInitializationFlag = "initialization"
// WaitForLeaderFlag is located here only because it is convenient
WaitForLeaderFlag = "leader"
)

// Namespace is a newtype of a string
Expand Down Expand Up @@ -83,6 +88,8 @@ type PDClient interface {
GetConfig() (*server.Config, error)
// GetCluster returns used when syncing pod labels.
GetCluster() (*metapb.Cluster, error)
// GetClusterStatus is used to determine if the cluster is initialized
GetClusterStatus() (*ClusterStatus, error)
// GetMembers returns all PD members from cluster
GetMembers() (*MembersInfo, error)
// GetStores lists all TiKV stores from cluster
Expand Down Expand Up @@ -120,6 +127,7 @@ var (
storePrefix = "pd/api/v1/store"
configPrefix = "pd/api/v1/config"
clusterIDPrefix = "pd/api/v1/cluster"
clusterStatusPrefix = "pd/api/v1/cluster/status"
schedulersPrefix = "pd/api/v1/schedulers"
pdLeaderPrefix = "pd/api/v1/leader"
pdLeaderTransferPrefix = "pd/api/v1/leader/transfer"
Expand All @@ -139,6 +147,13 @@ func NewPDClient(url string, timeout time.Duration) PDClient {
}
}

// ClusterStatus is the same as server.ClusterStatus except that IsInitialized is nullable.
// This allows for backwards compatibility testing.
type ClusterStatus struct {
RaftBootstrapTime time.Time `json:"raft_bootstrap_time,omitempty"`
IsInitialized *bool `json:"is_initialized"`
}

// following struct definitions are copied from github.com/pingcap/pd/server/api/store
// these are not exported by that package

Expand Down Expand Up @@ -233,6 +248,20 @@ func (pc *pdClient) GetConfig() (*server.Config, error) {
return config, nil
}

func (pc *pdClient) GetClusterStatus() (*ClusterStatus, error) {
apiURL := fmt.Sprintf("%s/%s", pc.url, clusterStatusPrefix)
body, err := httputil.GetBodyOK(pc.httpClient, apiURL)
if err != nil {
return nil, err
}
clusterStatus := &ClusterStatus{}
err = json.Unmarshal(body, clusterStatus)
if err != nil {
return nil, err
}
return clusterStatus, nil
}

func (pc *pdClient) GetCluster() (*metapb.Cluster, error) {
apiURL := fmt.Sprintf("%s/%s", pc.url, clusterIDPrefix)
body, err := httputil.GetBodyOK(pc.httpClient, apiURL)
Expand Down Expand Up @@ -594,6 +623,7 @@ const (
GetHealthActionType ActionType = "GetHealth"
GetConfigActionType ActionType = "GetConfig"
GetClusterActionType ActionType = "GetCluster"
GetClusterStatusActionType ActionType = "GetClusterStatus"
GetMembersActionType ActionType = "GetMembers"
GetStoresActionType ActionType = "GetStores"
GetTombStoneStoresActionType ActionType = "GetTombStoneStores"
Expand Down Expand Up @@ -676,6 +706,15 @@ func (pc *FakePDClient) GetCluster() (*metapb.Cluster, error) {
return result.(*metapb.Cluster), nil
}

func (pc *FakePDClient) GetClusterStatus() (*ClusterStatus, error) {
action := &Action{}
result, err := pc.fakeAPI(GetClusterStatusActionType, action)
if err != nil {
return nil, err
}
return result.(*ClusterStatus), nil
}

func (pc *FakePDClient) GetMembers() (*MembersInfo, error) {
action := &Action{}
result, err := pc.fakeAPI(GetMembersActionType, action)
Expand Down

0 comments on commit 07e3ee4

Please sign in to comment.