diff --git a/pkg/lightning/backend/local/local.go b/pkg/lightning/backend/local/local.go index ef85fb6d0..0bd7014f5 100644 --- a/pkg/lightning/backend/local/local.go +++ b/pkg/lightning/backend/local/local.go @@ -2309,6 +2309,14 @@ func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.Che if err != nil { return errors.Trace(err) } + clusterId, err := local.g.GetSQLExecutor().ObtainStringWithLog( + ctx, + "select substring(type,8) from METRICS_SCHEMA.PD_CLUSTER_METADATA limit 1;", + "check TiDB Cluster ID", + log.L()) + if err != nil { + return errors.Trace(err) + } if err := checkTiDBVersion(ctx, versionStr, localMinTiDBVersion, localMaxTiDBVersion); err != nil { return err } @@ -2318,6 +2326,9 @@ func (local *local) CheckRequirements(ctx context.Context, checkCtx *backend.Che if err := tikv.CheckTiKVVersion(ctx, local.tls, local.pdAddr, localMinTiKVVersion, localMaxTiKVVersion); err != nil { return err } + if err := tikv.CheckTiDBDestination(ctx, local.tls, local.pdAddr, clusterId); err != nil { + return err + } tidbVersion, _ := version.ExtractTiDBVersion(versionStr) return checkTiFlashVersion(ctx, local.g, checkCtx, *tidbVersion) diff --git a/pkg/lightning/restore/check_info.go b/pkg/lightning/restore/check_info.go index 6bc4691b3..23b82df7c 100644 --- a/pkg/lightning/restore/check_info.go +++ b/pkg/lightning/restore/check_info.go @@ -120,7 +120,7 @@ func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) er // ClusterIsAvailable check cluster is available to import data. this test can be skipped. func (rc *Controller) ClusterIsAvailable(ctx context.Context) error { passed := true - message := "Cluster is available" + message := "Cluster is available and correct" defer func() { rc.checkTemplate.Collect(Critical, passed, message) }() diff --git a/pkg/lightning/tikv/tikv.go b/pkg/lightning/tikv/tikv.go index a3c37b93d..fba694779 100644 --- a/pkg/lightning/tikv/tikv.go +++ b/pkg/lightning/tikv/tikv.go @@ -217,6 +217,18 @@ func CheckPDVersion(ctx context.Context, tls *common.TLS, pdAddr string, require return version.CheckVersion("PD", *ver, requiredMinVersion, requiredMaxVersion) } +func CheckTiDBDestination(ctx context.Context, tls *common.TLS, pdAddr string, clusterId string) error { + id, err := pdutil.FetchClusterID(ctx, tls, pdAddr) + if err != nil { + return errors.Trace(err) + } + + if id != clusterId { + return errors.Errorf("Failed to match the cluster ID, Please check whether status-port is correct") + } + return nil +} + func CheckTiKVVersion(ctx context.Context, tls *common.TLS, pdAddr string, requiredMinVersion, requiredMaxVersion semver.Version) error { return ForAllStores( ctx, diff --git a/pkg/pdutil/pd.go b/pkg/pdutil/pd.go index 9c86bd5f7..cb068ca16 100644 --- a/pkg/pdutil/pd.go +++ b/pkg/pdutil/pd.go @@ -12,6 +12,7 @@ import ( "math" "net/http" "net/url" + "strconv" "strings" "time" @@ -710,3 +711,21 @@ func FetchPDVersion(ctx context.Context, tls *common.TLS, pdAddr string) (*semve return parseVersion([]byte(rawVersion.Version)), nil } + +// FetchClusterID get Cluster ID +func FetchClusterID(ctx context.Context, tls *common.TLS, pdAddr string) (string, error) { + // An example of PD Cluster ID API. + // curl http://pd_address/pd/api/v1/cluster + // { + // "id": 7125154571691814555 + // } + var rawClusterID struct { + Id int `json:"id"` + } + err := tls.WithHost(pdAddr).GetJSON(ctx, "/pd/api/v1/cluster", &rawClusterID) + if err != nil { + return strconv.Itoa(rawClusterID.Id), errors.Trace(err) + } + + return strings.TrimSpace(strconv.Itoa(rawClusterID.Id)), nil +}