diff --git a/ddl/ddl.go b/ddl/ddl.go index ebec94019105b..2277e35ac4b0b 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -47,6 +47,7 @@ import ( "github.com/pingcap/tidb/table" goutil "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/admin" + "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/logutil" "go.etcd.io/etcd/clientv3" "go.uber.org/zap" @@ -731,3 +732,39 @@ func init() { RunInGoTest = true } } + +// GetDropOrTruncateTableInfoFromJobsByStore implements GetDropOrTruncateTableInfoFromJobs +func GetDropOrTruncateTableInfoFromJobsByStore(jobs []*model.Job, gcSafePoint uint64, getTable func(uint64, int64, int64) (*model.TableInfo, error), fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) { + for _, job := range jobs { + // Check GC safe point for getting snapshot infoSchema. + err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint) + if err != nil { + return false, err + } + if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable { + continue + } + + tbl, err := getTable(job.StartTS, job.SchemaID, job.TableID) + if err != nil { + if meta.ErrDBNotExists.Equal(err) { + // The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution, + // then can't find the table from the snapshot info-schema. Should just ignore error here, + // see more in TestParallelDropSchemaAndDropTable. + continue + } + return false, err + } + if tbl == nil { + // The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution, + // then can't find the table from the snapshot info-schema. Should just ignore error here, + // see more in TestParallelDropSchemaAndDropTable. + continue + } + finish, err := fn(job, tbl) + if err != nil || finish { + return finish, err + } + } + return false, nil +} diff --git a/executor/ddl.go b/executor/ddl.go index 255580c04d93b..ac97d95d2fa5a 100644 --- a/executor/ddl.go +++ b/executor/ddl.go @@ -710,42 +710,15 @@ func (e *DDLExec) getRecoverTableByJobID(s *ast.RecoverTableStmt, t *meta.Meta, // GetDropOrTruncateTableInfoFromJobs gets the dropped/truncated table information from DDL jobs, // it will use the `start_ts` of DDL job as snapshot to get the dropped/truncated table information. func GetDropOrTruncateTableInfoFromJobs(jobs []*model.Job, gcSafePoint uint64, dom *domain.Domain, fn func(*model.Job, *model.TableInfo) (bool, error)) (bool, error) { - for _, job := range jobs { - // Check GC safe point for getting snapshot infoSchema. - err := gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint) + getTable := func(StartTS uint64, SchemaID int64, TableID int64) (*model.TableInfo, error) { + snapMeta, err := dom.GetSnapshotMeta(StartTS) if err != nil { - return false, err - } - if job.Type != model.ActionDropTable && job.Type != model.ActionTruncateTable { - continue - } - - snapMeta, err := dom.GetSnapshotMeta(job.StartTS) - if err != nil { - return false, err - } - tbl, err := snapMeta.GetTable(job.SchemaID, job.TableID) - if err != nil { - if meta.ErrDBNotExists.Equal(err) { - // The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution, - // then can't find the table from the snapshot info-schema. Should just ignore error here, - // see more in TestParallelDropSchemaAndDropTable. - continue - } - return false, err - } - if tbl == nil { - // The dropped/truncated DDL maybe execute failed that caused by the parallel DDL execution, - // then can't find the table from the snapshot info-schema. Should just ignore error here, - // see more in TestParallelDropSchemaAndDropTable. - continue - } - finish, err := fn(job, tbl) - if err != nil || finish { - return finish, err + return nil, err } + tbl, err := snapMeta.GetTable(SchemaID, TableID) + return tbl, err } - return false, nil + return ddl.GetDropOrTruncateTableInfoFromJobsByStore(jobs, gcSafePoint, getTable, fn) } func (e *DDLExec) getRecoverTableByTableName(tableName *ast.TableName) (*model.Job, *model.TableInfo, error) { diff --git a/store/helper/helper.go b/store/helper/helper.go index 125052d10cd75..00706047eb145 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -15,6 +15,7 @@ package helper import ( + "bufio" "bytes" "context" "encoding/hex" @@ -32,7 +33,7 @@ import ( "github.com/pingcap/errors" deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/log" + "github.com/pingcap/tidb/ddl/placement" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/parser/model" @@ -875,7 +876,7 @@ type PDRegionStats struct { func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error { pdAddrs, err := h.GetPDAddr() if err != nil { - return err + return errors.Trace(err) } startKey := tablecodec.EncodeTablePrefix(tableID) @@ -891,12 +892,183 @@ func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error { resp, err := util.InternalHTTPClient().Get(statURL) if err != nil { - return err + return errors.Trace(err) + } + defer func() { + if err = resp.Body.Close(); err != nil { + logutil.BgLogger().Error("err", zap.Error(err)) + } + }() + + dec := json.NewDecoder(resp.Body) + + return dec.Decode(stats) +} + +// DeletePlacementRule is to delete placement rule for certain group. +func (h *Helper) DeletePlacementRule(group string, ruleID string) error { + pdAddrs, err := h.GetPDAddr() + if err != nil { + return errors.Trace(err) + } + + deleteURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule/%v/%v", + util.InternalHTTPSchema(), + pdAddrs[0], + group, + ruleID, + ) + + req, err := http.NewRequest("DELETE", deleteURL, nil) + if err != nil { + return errors.Trace(err) + } + + resp, err := util.InternalHTTPClient().Do(req) + if err != nil { + return errors.Trace(err) + } + defer func() { + if err = resp.Body.Close(); err != nil { + logutil.BgLogger().Error("err", zap.Error(err)) + } + }() + if resp.StatusCode != http.StatusOK { + return errors.New("DeletePlacementRule returns error") + } + return nil +} + +// SetPlacementRule is a helper function to set placement rule. +func (h *Helper) SetPlacementRule(rule placement.Rule) error { + pdAddrs, err := h.GetPDAddr() + if err != nil { + return errors.Trace(err) + } + m, _ := json.Marshal(rule) + + postURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rule", + util.InternalHTTPSchema(), + pdAddrs[0], + ) + buf := bytes.NewBuffer(m) + resp, err := util.InternalHTTPClient().Post(postURL, "application/json", buf) + if err != nil { + return errors.Trace(err) + } + defer func() { + if err = resp.Body.Close(); err != nil { + logutil.BgLogger().Error("err", zap.Error(err)) + } + }() + if resp.StatusCode != http.StatusOK { + return errors.New("SetPlacementRule returns error") + } + return nil +} + +// GetGroupRules to get all placement rule in a certain group. +func (h *Helper) GetGroupRules(group string) ([]placement.Rule, error) { + pdAddrs, err := h.GetPDAddr() + if err != nil { + return nil, errors.Trace(err) + } + + getURL := fmt.Sprintf("%s://%s/pd/api/v1/config/rules/group/%s", + util.InternalHTTPSchema(), + pdAddrs[0], + group, + ) + + resp, err := util.InternalHTTPClient().Get(getURL) + if err != nil { + return nil, errors.Trace(err) + } + defer func() { + if err = resp.Body.Close(); err != nil { + logutil.BgLogger().Error("err", zap.Error(err)) + } + }() + + if resp.StatusCode != http.StatusOK { + return nil, errors.New("GetGroupRules returns error") + } + + buf := new(bytes.Buffer) + _, err = buf.ReadFrom(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + + var rules []placement.Rule + err = json.Unmarshal(buf.Bytes(), &rules) + if err != nil { + return nil, errors.Trace(err) + } + + return rules, nil +} + +// PostAccelerateSchedule sends `regions/accelerate-schedule` request. +func (h *Helper) PostAccelerateSchedule(tableID int64) error { + pdAddrs, err := h.GetPDAddr() + if err != nil { + return errors.Trace(err) } + startKey := tablecodec.GenTableRecordPrefix(tableID) + endKey := tablecodec.EncodeTablePrefix(tableID + 1) + startKey = codec.EncodeBytes([]byte{}, startKey) + endKey = codec.EncodeBytes([]byte{}, endKey) + postURL := fmt.Sprintf("%s://%s/pd/api/v1/regions/accelerate-schedule", + util.InternalHTTPSchema(), + pdAddrs[0]) + + input := map[string]string{ + "start_key": url.QueryEscape(string(startKey)), + "end_key": url.QueryEscape(string(endKey)), + } + v, err := json.Marshal(input) + if err != nil { + return errors.Trace(err) + } + resp, err := util.InternalHTTPClient().Post(postURL, "application/json", bytes.NewBuffer(v)) + if err != nil { + return errors.Trace(err) + } defer func() { if err = resp.Body.Close(); err != nil { - log.Error("err", zap.Error(err)) + logutil.BgLogger().Error("err", zap.Error(err)) + } + }() + return nil +} + +// GetPDRegionRecordStats is a helper function calling `/stats/region`. +func (h *Helper) GetPDRegionRecordStats(tableID int64, stats *PDRegionStats) error { + pdAddrs, err := h.GetPDAddr() + if err != nil { + return errors.Trace(err) + } + + startKey := tablecodec.GenTableRecordPrefix(tableID) + endKey := tablecodec.EncodeTablePrefix(tableID + 1) + startKey = codec.EncodeBytes([]byte{}, startKey) + endKey = codec.EncodeBytes([]byte{}, endKey) + + statURL := fmt.Sprintf("%s://%s/pd/api/v1/stats/region?start_key=%s&end_key=%s", + util.InternalHTTPSchema(), + pdAddrs[0], + url.QueryEscape(string(startKey)), + url.QueryEscape(string(endKey))) + + resp, err := util.InternalHTTPClient().Get(statURL) + if err != nil { + return errors.Trace(err) + } + defer func() { + if err = resp.Body.Close(); err != nil { + logutil.BgLogger().Error("err", zap.Error(err)) } }() @@ -904,3 +1076,62 @@ func (h *Helper) GetPDRegionStats(tableID int64, stats *PDRegionStats) error { return dec.Decode(stats) } + +// GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey. +func GetTiFlashTableIDFromEndKey(endKey string) int64 { + endKey, _ = url.QueryUnescape(endKey) + _, decodedEndKey, _ := codec.DecodeBytes([]byte(endKey), []byte{}) + tableID := tablecodec.DecodeTableID(decodedEndKey) + tableID -= 1 + return tableID +} + +// ComputeTiFlashStatus is helper function for CollectTiFlashStatus. +func ComputeTiFlashStatus(reader *bufio.Reader, regionReplica *map[int64]int) error { + ns, _, _ := reader.ReadLine() + n, err := strconv.ParseInt(string(ns), 10, 64) + if err != nil { + return errors.Trace(err) + } + for i := int64(0); i < n; i++ { + rs, _, _ := reader.ReadLine() + // For (`table`, `store`), has region `r` + r, err := strconv.ParseInt(strings.Trim(string(rs), "\r\n \t"), 10, 32) + if err != nil { + return errors.Trace(err) + } + if i, ok := (*regionReplica)[r]; ok { + (*regionReplica)[r] = i + 1 + } else { + (*regionReplica)[r] = 1 + } + } + return nil +} + +// CollectTiFlashStatus query sync status of one table from TiFlash store. +// `regionReplica` is a map from RegionID to count of TiFlash Replicas in this region. +func CollectTiFlashStatus(statusAddress string, tableID int64, regionReplica *map[int64]int) error { + statURL := fmt.Sprintf("%s://%s/tiflash/sync-status/%d", + util.InternalHTTPSchema(), + statusAddress, + tableID, + ) + resp, err := util.InternalHTTPClient().Get(statURL) + if err != nil { + return errors.Trace(err) + } + + defer func() { + err = resp.Body.Close() + if err != nil { + logutil.BgLogger().Error("close body failed", zap.Error(err)) + } + }() + + reader := bufio.NewReader(resp.Body) + if err = ComputeTiFlashStatus(reader, regionReplica); err != nil { + return errors.Trace(err) + } + return nil +} diff --git a/store/helper/helper_test.go b/store/helper/helper_test.go index 3640b4f479cef..c29873fd81c74 100644 --- a/store/helper/helper_test.go +++ b/store/helper/helper_test.go @@ -15,10 +15,12 @@ package helper_test import ( + "bufio" "crypto/tls" "encoding/json" "net/http" "net/http/httptest" + "strings" "testing" "time" @@ -430,3 +432,21 @@ func mockStoreStatResponse(w http.ResponseWriter, _ *http.Request) { log.Panic("write http response failed", zap.Error(err)) } } + +func TestComputeTiFlashStatus(t *testing.T) { + regionReplica := make(map[int64]int) + // There are no region in this TiFlash store. + resp1 := "0\n\n" + // There are one region 1009 in this TiFlash store. + resp2 := "1\n1009\n" + br1 := bufio.NewReader(strings.NewReader(resp1)) + br2 := bufio.NewReader(strings.NewReader(resp2)) + err := helper.ComputeTiFlashStatus(br1, ®ionReplica) + require.NoError(t, err) + err = helper.ComputeTiFlashStatus(br2, ®ionReplica) + require.NoError(t, err) + require.Equal(t, len(regionReplica), 1) + v, ok := regionReplica[1009] + require.Equal(t, v, 1) + require.Equal(t, ok, true) +}