From 11f600549098b81c3d037f4fdffc7376a554691d Mon Sep 17 00:00:00 2001 From: Hu# Date: Tue, 9 Apr 2024 20:17:45 +0800 Subject: [PATCH] playground/pdms: wait for tso component ready (#2394) * wait for tso Signed-off-by: husharp * change api Signed-off-by: husharp * fix test Signed-off-by: husharp --------- Signed-off-by: husharp --- components/playground/instance/tikv.go | 37 +++++++++++++++++++++----- components/playground/playground.go | 2 +- pkg/cluster/api/pdapi.go | 28 +++++++++++++++++++ 3 files changed, 59 insertions(+), 8 deletions(-) diff --git a/components/playground/instance/tikv.go b/components/playground/instance/tikv.go index 1448714874..08dfe04ebe 100644 --- a/components/playground/instance/tikv.go +++ b/components/playground/instance/tikv.go @@ -18,7 +18,9 @@ import ( "fmt" "path/filepath" "strings" + "time" + "github.com/pingcap/tiup/pkg/cluster/api" tiupexec "github.com/pingcap/tiup/pkg/exec" "github.com/pingcap/tiup/pkg/utils" ) @@ -26,14 +28,16 @@ import ( // TiKVInstance represent a running tikv-server type TiKVInstance struct { instance - pds []*PDInstance + pds []*PDInstance + tsos []*PDInstance Process - isCSEMode bool - cseOpts CSEOptions + isCSEMode bool + cseOpts CSEOptions + isPDMSMode bool } // NewTiKVInstance return a TiKVInstance -func NewTiKVInstance(binPath string, dir, host, configPath string, id int, port int, pds []*PDInstance, isCSEMode bool, cseOptions CSEOptions) *TiKVInstance { +func NewTiKVInstance(binPath string, dir, host, configPath string, id int, port int, pds []*PDInstance, tsos []*PDInstance, isCSEMode bool, cseOptions CSEOptions, isPDMSMode bool) *TiKVInstance { if port <= 0 { port = 20160 } @@ -47,9 +51,11 @@ func NewTiKVInstance(binPath string, dir, host, configPath string, id int, port StatusPort: utils.MustGetFreePort(host, 20180), ConfigPath: configPath, }, - pds: pds, - isCSEMode: isCSEMode, - cseOpts: cseOptions, + pds: pds, + tsos: tsos, + isCSEMode: isCSEMode, + cseOpts: cseOptions, + isPDMSMode: isPDMSMode, } } @@ -69,6 +75,23 @@ func (inst *TiKVInstance) Start(ctx context.Context, version utils.Version) erro return err } + // Need to check tso status + if inst.isPDMSMode { + var tsoEnds []string + for _, pd := range inst.tsos { + tsoEnds = append(tsoEnds, fmt.Sprintf("%s:%d", AdvertiseHost(pd.Host), pd.StatusPort)) + } + pdcli := api.NewPDClient(ctx, + tsoEnds, 10*time.Second, nil, + ) + if err := pdcli.CheckTSOHealth(&utils.RetryOption{ + Delay: time.Second * 5, + Timeout: time.Second * 300, + }); err != nil { + return err + } + } + endpoints := pdEndpoints(inst.pds, true) args := []string{ fmt.Sprintf("--addr=%s", utils.JoinHostPort(inst.Host, inst.Port)), diff --git a/components/playground/playground.go b/components/playground/playground.go index a20b9dfcbc..7677d1f824 100644 --- a/components/playground/playground.go +++ b/components/playground/playground.go @@ -735,7 +735,7 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif ins = inst p.tidbs = append(p.tidbs, inst) case spec.ComponentTiKV: - inst := instance.NewTiKVInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.bootOptions.Mode == "tidb-cse", p.bootOptions.CSEOpts) + inst := instance.NewTiKVInstance(cfg.BinPath, dir, host, cfg.ConfigPath, id, cfg.Port, p.pds, p.tsos, p.bootOptions.Mode == "tidb-cse", p.bootOptions.CSEOpts, p.bootOptions.PDMode == "ms") ins = inst p.tikvs = append(p.tikvs, inst) case spec.ComponentTiFlash: diff --git a/pkg/cluster/api/pdapi.go b/pkg/cluster/api/pdapi.go index 4e6f17a930..329946bacf 100644 --- a/pkg/cluster/api/pdapi.go +++ b/pkg/cluster/api/pdapi.go @@ -139,6 +139,7 @@ var ( pdStoresURI = "pd/api/v1/stores" pdStoresLimitURI = "pd/api/v1/stores/limit" pdRegionsCheckURI = "pd/api/v1/regions/check" + tsoHealthPrefix = "tso/api/v1/health" ) func tryURLs(endpoints []string, f func(endpoint string) ([]byte, error)) ([]byte, error) { @@ -198,6 +199,33 @@ func (pc *PDClient) CheckHealth() error { return nil } +// CheckTSOHealth checks the health of TSO service(which is a Micro Service component of PD) +func (pc *PDClient) CheckTSOHealth(retryOpt *utils.RetryOption) error { + servicePrefix := fmt.Sprintf("tso/%s", tsoHealthPrefix) + endpoints := pc.getEndpoints(servicePrefix) + + if err := utils.Retry(func() error { + var err error + for _, endpoint := range endpoints { + _, err = pc.httpClient.Get(pc.ctx, endpoint) + if err != nil { + return err + } + } + if err == nil { + return nil + } + + // return error by default, to make the retry work + pc.l().Debugf("Still waiting for the PD Micro Service's TSO health") + return perrs.New("Still waiting for the PD Micro Service's TSO health") + }, *retryOpt); err != nil { + return fmt.Errorf("error check PD Micro Service's TSO health, %v", err) + } + + return nil +} + // GetStores queries the stores info from PD server func (pc *PDClient) GetStores() (*StoresInfo, error) { // Return all stores