Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: add helper function to set and query TiFlash's sync status #30473

Merged
merged 13 commits into from
Dec 17, 2021
37 changes: 37 additions & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"github.com/pingcap/tidb/table"
goutil "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/admin"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/logutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -731,3 +732,39 @@ func init() {
RunInGoTest = true
}
}

// GetDropOrTruncateTableInfoFromJobsByStore implements GetDropOrTruncateTableInfoFromJobs
func GetDropOrTruncateTableInfoFromJobsByStore(jobs []*model.Job, gcSafePoint uint64, getTable func(uint64, int64, int64) (*model.TableInfo, error), fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
for _, job := range jobs {
// Check GC safe point for getting snapshot infoSchema.
err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
if err != nil {
return false, err
}
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
continue
}

tbl, err := getTable(job.StartTS, job.SchemaID, job.TableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
return false, err
}
if tbl == nil {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
finish, err := fn(job, tbl)
if err != nil || finish {
return finish, err
}
}
return false, nil
}
39 changes: 6 additions & 33 deletions executor/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,42 +710,15 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta,
// GetDropOrTruncateTableInfoFromJobs gets the dropped/truncated table information from DDL jobs,
// it will use the `start_ts` of DDL job as snapshot to get the dropped/truncated table information.
func GetDropOrTruncateTableInfoFromJobs(jobs []*model.Job, gcSafePoint uint64, dom *domain.Domain, fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) {
for _, job := range jobs {
// Check GC safe point for getting snapshot infoSchema.
err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) {
snapMeta, err := dom.GetSnapshotMeta(StartTS)
if err != nil {
return false, err
}
if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable {
continue
}

snapMeta, err := dom.GetSnapshotMeta(job.StartTS)
if err != nil {
return false, err
}
tbl, err := snapMeta.GetTable(job.SchemaID, job.TableID)
if err != nil {
if meta.ErrDBNotExists.Equal(err) {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
return false, err
}
if tbl == nil {
// The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution,
// then can't find the table from the snapshot info-schema. Should just ignore error here,
// see more in TestParallelDropSchemaAndDropTable.
continue
}
finish, err := fn(job, tbl)
if err != nil || finish {
return finish, err
return nil, err
}
tbl, err := snapMeta.GetTable(SchemaID, TableID)
return tbl, err
}
return false, nil
return ddl.GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, fn)
}

func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.Job, *model.TableInfo, error) {
Expand Down
239 changes: 235 additions & 4 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package helper

import (
"bufio"
"bytes"
"context"
"encoding/hex"
Expand All @@ -32,7 +33,7 @@ import (
"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -875,7 +876,7 @@ type PDRegionStats struct {
func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return err
return errors.Trace(err)
}

startKey := tablecodec.EncodeTablePrefix(tableID)
Expand All @@ -891,16 +892,246 @@ func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error {

resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return err
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

dec := json.NewDecoder(resp.Body)

return dec.Decode(stats)
}

// DeletePlacementRule is to delete placement rule for certain group.
func (h *Helper) DeletePlacementRule(group string, ruleID string) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}

deleteURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule/%v/%v",
util.InternalHTTPSchema(),
pdAddrs[0],
group,
ruleID,
)

req, err := http.NewRequest("DELETE", deleteURL, nil)
if err != nil {
return errors.Trace(err)
}

resp, err := util.InternalHTTPClient().Do(req)
if err != nil {
return errors.Trace(err)
}
defer func() {
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
if resp.StatusCode != http.StatusOK {
return errors.New("DeletePlacementRule returns error")
}
return nil
}

// SetPlacementRule is a helper function to set placement rule.
func (h *Helper) SetPlacementRule(rule placement.Rule) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}
m, _ := json.Marshal(rule)

postURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule",
util.InternalHTTPSchema(),
pdAddrs[0],
)
buf := bytes.NewBuffer(m)
resp, err := util.InternalHTTPClient().Post(postURL, "application/json", buf)
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
if resp.StatusCode != http.StatusOK {
return errors.New("SetPlacementRule returns error")
}
return nil
}

// GetGroupRules to get all placement rule in a certain group.
func (h *Helper) GetGroupRules(group string) ([]placement.Rule, error) {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return nil, errors.Trace(err)
}

getURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rules/group/%s",
util.InternalHTTPSchema(),
pdAddrs[0],
group,
)

resp, err := util.InternalHTTPClient().Get(getURL)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

if resp.StatusCode != http.StatusOK {
return nil, errors.New("GetGroupRules returns error")
}

buf := new(bytes.Buffer)
_, err = buf.ReadFrom(resp.Body)
if err != nil {
return nil, errors.Trace(err)
}

var rules []placement.Rule
err = json.Unmarshal(buf.Bytes(), &rules)
if err != nil {
return nil, errors.Trace(err)
}

return rules, nil
}

// PostAccelerateSchedule sends `regions/accelerate-schedule` request.
func (h *Helper) PostAccelerateSchedule(tableID int64) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}
startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

postURL := fmt.Sprintf("%s://%s/pd/api/v1/regions/accelerate-schedule",
util.InternalHTTPSchema(),
pdAddrs[0])

input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
}
v, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
resp, err := util.InternalHTTPClient().Post(postURL, "application/json", bytes.NewBuffer(v))
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
log.Error("err", zap.Error(err))
logutil.BgLogger().Error("err", zap.Error(err))
}
}()
return nil
}

// GetPDRegionRecordStats is a helper function calling `/stats/region`.
func (h *Helper) GetPDRegionRecordStats(tableID int64, stats *PDRegionStats) error {
pdAddrs, err := h.GetPDAddr()
if err != nil {
return errors.Trace(err)
}

startKey := tablecodec.GenTableRecordPrefix(tableID)
endKey := tablecodec.EncodeTablePrefix(tableID + 1)
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

statURL := fmt.Sprintf("%s://%s/pd/api/v1/stats/region?start_key=%s&end_key=%s",
util.InternalHTTPSchema(),
pdAddrs[0],
url.QueryEscape(string(startKey)),
url.QueryEscape(string(endKey)))

resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return errors.Trace(err)
}
defer func() {
if err = resp.Body.Close(); err != nil {
logutil.BgLogger().Error("err", zap.Error(err))
}
}()

dec := json.NewDecoder(resp.Body)

return dec.Decode(stats)
}

// GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey.
func GetTiFlashTableIDFromEndKey(endKey string) int64 {
endKey, _ = url.QueryUnescape(endKey)
_, decodedEndKey, _ := codec.DecodeBytes([]byte(endKey), []byte{})
tableID := tablecodec.DecodeTableID(decodedEndKey)
tableID -= 1
CalvinNeo marked this conversation as resolved.
Show resolved Hide resolved
return tableID
}

// ComputeTiFlashStatus is helper function for CollectTiFlashStatus.
func ComputeTiFlashStatus(reader *bufio.Reader, regionReplica *map[int64]int) error {
ns, _, _ := reader.ReadLine()
n, err := strconv.ParseInt(string(ns), 10, 64)
if err != nil {
return errors.Trace(err)
}
for i := int64(0); i < n; i++ {
rs, _, _ := reader.ReadLine()
// For (`table`, `store`), has region `r`
r, err := strconv.ParseInt(strings.Trim(string(rs), "\r\n \t"), 10, 32)
if err != nil {
return errors.Trace(err)
}
if i, ok := (*regionReplica)[r]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i is double declared, better change name

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will replace i in later PR

(*regionReplica)[r] = i + 1
} else {
(*regionReplica)[r] = 1
}
}
return nil
}

// CollectTiFlashStatus query sync status of one table from TiFlash store.
// `regionReplica` is a map from RegionID to count of TiFlash Replicas in this region.
func CollectTiFlashStatus(statusAddress string, tableID int64, regionReplica *map[int64]int) error {
statURL := fmt.Sprintf("%s://%s/tiflash/sync-status/%d",
util.InternalHTTPSchema(),
statusAddress,
tableID,
)
resp, err := util.InternalHTTPClient().Get(statURL)
if err != nil {
return errors.Trace(err)
}

defer func() {
err = resp.Body.Close()
if err != nil {
logutil.BgLogger().Error("close body failed", zap.Error(err))
}
}()

reader := bufio.NewReader(resp.Body)
if err = ComputeTiFlashStatus(reader, regionReplica); err != nil {
return errors.Trace(err)
}
return nil
}
Loading