From 63a7fec511c160a28137cbf61425ef9e24db10f2 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 6 Aug 2024 17:30:46 +0800 Subject: [PATCH 1/7] a Signed-off-by: Calvin Neo --- Makefile | 2 + pkg/schedule/handler/handler.go | 149 +++++++++++++++++++++++++++++++- server/api/region.go | 12 +++ server/api/router.go | 1 + 4 files changed, 163 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 34c3be775be..5e61abb1300 100644 --- a/Makefile +++ b/Makefile @@ -243,12 +243,14 @@ SUBMODULES := $(filter $(shell find . -iname "go.mod" -exec dirname {} \;),\ test: install-tools # testing all pkgs... @$(FAILPOINT_ENABLE) + echo ${TEST_PKGS} CGO_ENABLED=1 go test -tags tso_function_test,deadlock -timeout 20m -race -cover $(TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } @$(FAILPOINT_DISABLE) basic-test: install-tools # testing basic pkgs... @$(FAILPOINT_ENABLE) + echo ${TEST_PKGS} go test $(BASIC_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; } @$(FAILPOINT_DISABLE) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 9f9de274278..c7074dd6474 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1261,8 +1261,155 @@ func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []any, retryLim return s, nil } +type StoreRegionSet struct { + ID int64 + Info StoreInfo + RegionIDSet map[int64]bool +} + +type MigrationOp struct { + FromStore int64 + ToStore int64 + Regions map[int64]interface{} +} + +func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *MigrationOp { + o := MigrationOp{ + FromStore: fromStore.ID, + ToStore: toStore.ID, + Regions: make(map[int64]interface{}), + } + for r, removed := range fromStore.RegionIDSet { + if n == 0 { + break + } + if removed { + continue + } + if _, exist := toStore.RegionIDSet[r]; !exist { + o.Regions[r] = nil + fromStore.RegionIDSet[r] = true + } + n-- + } + return &o +} + +func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { + totalRegionCount := 0 + for _, store := range stores { + totalRegionCount += len(store.RegionIDSet) + } + for _, store := range stores { + percentage := 100 * float64(len(store.RegionIDSet)) / float64(totalRegionCount) + log.Info("store region dist", + zap.Int64("store-id", store.ID), + zap.Int("num-region", len(store.RegionIDSet)), + zap.String("percentage", fmt.Sprintf("%.2f%%", percentage))) + } + avr := totalRegionCount / len(stores) + remainder := totalRegionCount % len(stores) + // sort TiFlash stores by region count in descending order + slices.SortStableFunc(stores, func(lhs, rhs *StoreRegionSet) int { + return -cmp.Compare(len(lhs.RegionIDSet), len(rhs.RegionIDSet)) + }) + expectedCount := []int{} + for i := 0; i < remainder; i++ { + expectedCount = append(expectedCount, avr+1) + } + for i := remainder; i < len(stores); i++ { + expectedCount = append(expectedCount, avr) + } + senders := []int{} + receivers := []int{} + sendersVolume := []int{} + receiversVolume := []int{} + for i, store := range stores { + if len(store.RegionIDSet) < expectedCount[i] { + receivers = append(receivers, i) + receiversVolume = append(receiversVolume, expectedCount[i]-len(store.RegionIDSet)) + } + if len(store.RegionIDSet) > expectedCount[i] { + senders = append(senders, i) + sendersVolume = append(sendersVolume, len(store.RegionIDSet)-expectedCount[i]) + } + } + + ops := []*MigrationOp{} + + for i, senderIndex := range senders { + fromStore := stores[senderIndex] + for { + if sendersVolume[i] <= 0 { + break + } + for j, receiverIndex := range receivers { + toStore := stores[receiverIndex] + if receiversVolume[j] > 0 { + n := sendersVolume[i] + if n > receiversVolume[j] { + n = receiversVolume[j] + } + receiversVolume[j] -= n + sendersVolume[i] -= n + ops = append(ops, PickRegions(n, fromStore, toStore)) + } + } + } + } + + return senders, receivers, ops +} + +// CheckRegionsReplicated checks if regions are replicated. +func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (string, error) { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return "", err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return "", err + } + c := h.GetCluster() + if c == nil { + return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regions := c.ScanRegions(startKey, endKey, -1) + + stores := c.GetStores() + candidates := make([]*StoreRegionSet) + storeLabelMap := make(map[string]*metapb.StoreLabel) + for _, l := range storeLabels { + storeLabelMap[l.Key] = l + } + for _, s := range stores { + if len(s.GetLabels()) != len(storeLabelMap) { + continue + } + for _, l := s.GetLabels() { + if larg, ok := storeLabelMap[l.Key]; ok { + if larg.Value != l.Value { + continue + } + } else { + continue + } + } + candidates = append(candidates, s) + } + + + + return state, nil +} + // CheckRegionsReplicated checks if regions are replicated. -func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string, error) { +func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string) (string, error) { startKey, err := hex.DecodeString(rawStartKey) if err != nil { return "", err diff --git a/server/api/region.go b/server/api/region.go index c6bc3d9e699..dc69fd82d08 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -128,6 +128,18 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R h.rd.JSON(w, http.StatusOK, state) } +func (h *regionsHandler) BalanceRegion(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + rawStartKey := vars["startKey"] + rawEndKey := vars["endKey"] + state, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey) + if err != nil { + h.rd.JSON(w, http.StatusBadRequest, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, state) +} + type regionsHandler struct { *server.Handler svr *server.Server diff --git a/server/api/router.go b/server/api/router.go index 7aef165b267..e8b12e88cca 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -284,6 +284,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/replicated", regionsHandler.CheckRegionsReplicated, setMethods(http.MethodGet), setQueries("startKey", "{startKey}", "endKey", "{endKey}"), setAuditBackend(prometheus)) + registerFunc(clusterRouter, "/regions/balance", regionsHandler.BalanceRegion, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/version", newVersionHandler(rd).GetVersion, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(apiRouter, "/status", newStatusHandler(svr, rd).GetPDStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) From e49630bf6cb447568a1e6c967457720d30ac3e28 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 6 Aug 2024 20:20:17 +0800 Subject: [PATCH 2/7] more Signed-off-by: Calvin Neo --- pkg/schedule/handler/handler.go | 60 ++++++++++++++++++++++++++------- 1 file changed, 47 insertions(+), 13 deletions(-) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index c7074dd6474..1fc9d68cc6e 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -16,9 +16,12 @@ package handler import ( "bytes" + "cmp" "context" "encoding/hex" + "fmt" "net/http" + "slices" "strconv" "strings" "time" @@ -1262,22 +1265,26 @@ func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []any, retryLim } type StoreRegionSet struct { - ID int64 - Info StoreInfo - RegionIDSet map[int64]bool + ID uint64 + Info core.StoreInfo + RegionIDSet map[uint64]bool + OriginalPeer map[uint64]*metapb.Peer } type MigrationOp struct { - FromStore int64 - ToStore int64 - Regions map[int64]interface{} + FromStore uint64 + ToStore uint64 + ToStoreInfo *core.StoreInfo + OriginalPeer *metapb.Peer + Regions map[uint64]interface{} } func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *MigrationOp { o := MigrationOp{ - FromStore: fromStore.ID, - ToStore: toStore.ID, - Regions: make(map[int64]interface{}), + FromStore: fromStore.ID, + ToStore: toStore.ID, + ToStoreInfo: toStore.Info, + Regions: make(map[uint64]interface{}), } for r, removed := range fromStore.RegionIDSet { if n == 0 { @@ -1288,6 +1295,7 @@ func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *Mig } if _, exist := toStore.RegionIDSet[r]; !exist { o.Regions[r] = nil + o.OriginalPeer = fromStore.OriginalPeer[r] fromStore.RegionIDSet[r] = true } n-- @@ -1303,7 +1311,7 @@ func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { for _, store := range stores { percentage := 100 * float64(len(store.RegionIDSet)) / float64(totalRegionCount) log.Info("store region dist", - zap.Int64("store-id", store.ID), + zap.Uint64("store-id", store.ID), zap.Int("num-region", len(store.RegionIDSet)), zap.String("percentage", fmt.Sprintf("%.2f%%", percentage))) } @@ -1380,6 +1388,10 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLab return "", errs.ErrNotBootstrapped.GenWithStackByArgs() } regions := c.ScanRegions(startKey, endKey, -1) + regionIdMap := make(map[uint64]*core.RegionInfo) + for _, r := range regions { + regionIdMap[r.GetID()] = r + } stores := c.GetStores() candidates := make([]*StoreRegionSet) @@ -1391,7 +1403,7 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLab if len(s.GetLabels()) != len(storeLabelMap) { continue } - for _, l := s.GetLabels() { + for _, l := range s.GetLabels() { if larg, ok := storeLabelMap[l.Key]; ok { if larg.Value != l.Value { continue @@ -1400,11 +1412,33 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLab continue } } - candidates = append(candidates, s) + candidate := StoreRegionSet{ + ID: s.GetID(), + Info: s, + RegionIDSet: make(map[uint64]bool), + OriginalPeer: make(map[uint64]*metapb.Peer), + } + for _, r := range regions { + for _, p := range r.GetPeers() { + if p.StoreId == s.GetID() { + candidate.RegionIDSet[r.GetID()] = false + } + } + } + if len(candidate.RegionIDSet) > 0 { + candidates = append(candidates, candidate) + } } + senders, receivers, ops := MigrationPlan(candidates) - + for _, op := range ops { + for r, _ := range op.Regions { + newPeer := &metapb.Peer{StoreId: op.ToStore, Role: op.OriginalPeer.Role, IsWitness: op.OriginalPeer.IsWitness} + o := operator.CreateMovePeerOperator("balance-region", c, r, operator.OpReplica, op.FromStore, newPeer) + co.GetOperatorController().AddOperator(o) + } + } return state, nil } From 3e683fe90fd16227c47f824671a3d2cdb2f5dfab Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Tue, 6 Aug 2024 20:47:05 +0800 Subject: [PATCH 3/7] a Signed-off-by: Calvin Neo --- pkg/schedule/handler/handler.go | 23 +++++++++++++---------- server/api/region.go | 4 +++- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 1fc9d68cc6e..413d5f26199 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1266,7 +1266,7 @@ func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []any, retryLim type StoreRegionSet struct { ID uint64 - Info core.StoreInfo + Info *core.StoreInfo RegionIDSet map[uint64]bool OriginalPeer map[uint64]*metapb.Peer } @@ -1369,8 +1369,8 @@ func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { return senders, receivers, ops } -// CheckRegionsReplicated checks if regions are replicated. -func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (string, error) { +// BalanceRegion checks if regions are imbalanced and rebalance them. +func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (string, error) { startKey, err := hex.DecodeString(rawStartKey) if err != nil { return "", err @@ -1394,7 +1394,7 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLab } stores := c.GetStores() - candidates := make([]*StoreRegionSet) + candidates := make([]*StoreRegionSet, 0) storeLabelMap := make(map[string]*metapb.StoreLabel) for _, l := range storeLabels { storeLabelMap[l.Key] = l @@ -1412,7 +1412,7 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLab continue } } - candidate := StoreRegionSet{ + candidate := &StoreRegionSet{ ID: s.GetID(), Info: s, RegionIDSet: make(map[uint64]bool), @@ -1430,20 +1430,23 @@ func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string, storeLab } } - senders, receivers, ops := MigrationPlan(candidates) + _, _, ops := MigrationPlan(candidates) for _, op := range ops { - for r, _ := range op.Regions { + for rid, _ := range op.Regions { newPeer := &metapb.Peer{StoreId: op.ToStore, Role: op.OriginalPeer.Role, IsWitness: op.OriginalPeer.IsWitness} - o := operator.CreateMovePeerOperator("balance-region", c, r, operator.OpReplica, op.FromStore, newPeer) + o, err := operator.CreateMovePeerOperator("balance-region", c, regionIdMap[rid], operator.OpReplica, op.FromStore, newPeer) + if err != nil { + return "", err + } co.GetOperatorController().AddOperator(o) } } - return state, nil + return "", nil } // CheckRegionsReplicated checks if regions are replicated. -func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string) (string, error) { +func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string, error) { startKey, err := hex.DecodeString(rawStartKey) if err != nil { return "", err diff --git a/server/api/region.go b/server/api/region.go index dc69fd82d08..a1973254fd2 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -25,6 +25,7 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" @@ -132,7 +133,8 @@ func (h *regionsHandler) BalanceRegion(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) rawStartKey := vars["startKey"] rawEndKey := vars["endKey"] - state, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey) + storeLabels := make([]*metapb.StoreLabel, 0) + state, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey, storeLabels) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return From cdd686af60960eca1af6c0e7ec97da47086f8f7a Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 21 Aug 2024 14:37:46 +0800 Subject: [PATCH 4/7] fix tests Signed-off-by: Calvin Neo --- pkg/schedule/handler/handler.go | 64 ++++++++++++++++++++----------- tests/server/api/operator_test.go | 52 +++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 22 deletions(-) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index 413d5f26199..e02ea27d03e 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1276,7 +1276,7 @@ type MigrationOp struct { ToStore uint64 ToStoreInfo *core.StoreInfo OriginalPeer *metapb.Peer - Regions map[uint64]interface{} + Regions map[uint64]bool } func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *MigrationOp { @@ -1284,7 +1284,7 @@ func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *Mig FromStore: fromStore.ID, ToStore: toStore.ID, ToStoreInfo: toStore.Info, - Regions: make(map[uint64]interface{}), + Regions: make(map[uint64]bool), } for r, removed := range fromStore.RegionIDSet { if n == 0 { @@ -1294,23 +1294,29 @@ func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *Mig continue } if _, exist := toStore.RegionIDSet[r]; !exist { - o.Regions[r] = nil + // If toStore doesn't has this region, then create a move op. + o.Regions[r] = false o.OriginalPeer = fromStore.OriginalPeer[r] + log.Info("!!!! Pick S", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore), zap.Any("OriginalPeer", fromStore.OriginalPeer[r])) fromStore.RegionIDSet[r] = true + n-- + } else { + log.Info("!!!! Pick", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore)) } - n-- } return &o } func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { + log.Info("!!! MigrationPlan", + zap.Any("store-id", stores)) totalRegionCount := 0 for _, store := range stores { totalRegionCount += len(store.RegionIDSet) } for _, store := range stores { percentage := 100 * float64(len(store.RegionIDSet)) / float64(totalRegionCount) - log.Info("store region dist", + log.Info("!!! store region dist", zap.Uint64("store-id", store.ID), zap.Int("num-region", len(store.RegionIDSet)), zap.String("percentage", fmt.Sprintf("%.2f%%", percentage))) @@ -1328,6 +1334,8 @@ func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { for i := remainder; i < len(stores); i++ { expectedCount = append(expectedCount, avr) } + + log.Info("!!! expectedCount", zap.Any("expectedCount", expectedCount)) senders := []int{} receivers := []int{} sendersVolume := []int{} @@ -1388,30 +1396,39 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me return "", errs.ErrNotBootstrapped.GenWithStackByArgs() } regions := c.ScanRegions(startKey, endKey, -1) - regionIdMap := make(map[uint64]*core.RegionInfo) + regionIDMap := make(map[uint64]*core.RegionInfo) for _, r := range regions { - regionIdMap[r.GetID()] = r + regionIDMap[r.GetID()] = r } stores := c.GetStores() candidates := make([]*StoreRegionSet, 0) - storeLabelMap := make(map[string]*metapb.StoreLabel) - for _, l := range storeLabels { - storeLabelMap[l.Key] = l - } for _, s := range stores { - if len(s.GetLabels()) != len(storeLabelMap) { + storeLabelMap := make(map[string]*metapb.StoreLabel) + for _, l := range s.GetLabels() { + storeLabelMap[l.Key] = l + } + if len(storeLabels) != len(storeLabelMap) { continue } - for _, l := range s.GetLabels() { - if larg, ok := storeLabelMap[l.Key]; ok { + log.Info("!!!! store pass 1", zap.Any("s", s), zap.Any("l", s.GetLabels()), zap.Any("id", s.GetID())) + gotLabels := true + for _, larg := range storeLabels { + if l, ok := storeLabelMap[larg.Key]; ok { if larg.Value != l.Value { - continue + gotLabels = false + break } } else { - continue + gotLabels = false + break } } + + if !gotLabels { + continue + } + log.Info("!!!! store pass 2", zap.Any("s", s)) candidate := &StoreRegionSet{ ID: s.GetID(), Info: s, @@ -1422,20 +1439,23 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me for _, p := range r.GetPeers() { if p.StoreId == s.GetID() { candidate.RegionIDSet[r.GetID()] = false + candidate.OriginalPeer[r.GetID()] = p } } } - if len(candidate.RegionIDSet) > 0 { - candidates = append(candidates, candidate) - } + log.Info("!!!! store pass 3", zap.Any("s", s)) + candidates = append(candidates, candidate) } - _, _, ops := MigrationPlan(candidates) + senders, receivers, ops := MigrationPlan(candidates) + + log.Info("Migration plan details", zap.Any("senders", senders), zap.Any("receivers", receivers), zap.Any("ops", ops)) for _, op := range ops { - for rid, _ := range op.Regions { + for rid := range op.Regions { newPeer := &metapb.Peer{StoreId: op.ToStore, Role: op.OriginalPeer.Role, IsWitness: op.OriginalPeer.IsWitness} - o, err := operator.CreateMovePeerOperator("balance-region", c, regionIdMap[rid], operator.OpReplica, op.FromStore, newPeer) + log.Debug("Create balace region op", zap.Uint64("from", op.FromStore), zap.Uint64("to", op.ToStore), zap.Uint64("region_id", rid)) + o, err := operator.CreateMovePeerOperator("balance-region", c, regionIDMap[rid], operator.OpReplica, op.FromStore, newPeer) if err != nil { return "", err } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 857ea6b3cdd..4fa028d64bf 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -653,3 +653,55 @@ func (suite *operatorTestSuite) checkRemoveOperators(cluster *tests.TestCluster) err = tu.CheckGetJSON(tests.TestDialClient, url, nil, tu.StatusOK(re), tu.StringNotContain(re, "merge: region 10 to 20"), tu.StringNotContain(re, "add peer: store [4]")) re.NoError(err) } + +func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) { + re := suite.Require() + stores := []*metapb.Store{ + { + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + { + Id: 4, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + LastHeartbeat: time.Now().UnixNano(), + }, + } + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + pauseAllCheckers(re, cluster) + r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) + tests.MustPutRegionInfo(re, cluster, r1) + r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) + tests.MustPutRegionInfo(re, cluster, r2) + r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) + tests.MustPutRegionInfo(re, cluster, r3) + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + // err := tu.CheckGetJSON(tests.TestDialClient, fmt.Sprintf("%s/stores", urlPrefix), []byte(``), tu.StringContain(re, "add peer: store [4]")) + // re.NoError(err) + e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re)) + re.NoError(e) +} + +func (suite *operatorTestSuite) TestBalanceRegions() { + // use a new environment to avoid being affected by other tests + env := tests.NewSchedulingTestEnvironment(suite.T(), + func(conf *config.Config, _ string) { + conf.Replication.MaxReplicas = 1 + }) + env.RunTestBasedOnMode(suite.checkBalanceRegions) + env.Cleanup() +} From 9bfe8b6a27a9bcc2b69b29b4abe4cde3174469f8 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 21 Aug 2024 15:12:24 +0800 Subject: [PATCH 5/7] fix Signed-off-by: Calvin Neo --- pkg/schedule/handler/handler.go | 35 ++++++++++++++++++++----------- pkg/utils/testutil/api_check.go | 3 +++ server/api/region.go | 4 ++-- tests/server/api/operator_test.go | 7 ++++--- 4 files changed, 32 insertions(+), 17 deletions(-) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index e02ea27d03e..bd2afc2973c 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1272,11 +1272,11 @@ type StoreRegionSet struct { } type MigrationOp struct { - FromStore uint64 - ToStore uint64 - ToStoreInfo *core.StoreInfo - OriginalPeer *metapb.Peer - Regions map[uint64]bool + FromStore uint64 `json:"from_store"` + ToStore uint64 `json:"to_store"` + ToStoreInfo *core.StoreInfo `json:"to_store_info"` + OriginalPeer *metapb.Peer `json:"original_peer"` + Regions map[uint64]bool `json:"regions"` } func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *MigrationOp { @@ -1377,23 +1377,28 @@ func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { return senders, receivers, ops } +type MigrationResult struct { + ErrorCode uint64 `json:"error_code"` + Ops []*MigrationOp `json:"ops"` +} + // BalanceRegion checks if regions are imbalanced and rebalance them. -func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (string, error) { +func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (MigrationResult, error) { startKey, err := hex.DecodeString(rawStartKey) if err != nil { - return "", err + return MigrationResult{ErrorCode: 1, Ops: nil}, err } endKey, err := hex.DecodeString(rawEndKey) if err != nil { - return "", err + return MigrationResult{ErrorCode: 1, Ops: nil}, err } c := h.GetCluster() if c == nil { - return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() } co := h.GetCoordinator() if co == nil { - return "", errs.ErrNotBootstrapped.GenWithStackByArgs() + return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() } regions := c.ScanRegions(startKey, endKey, -1) regionIDMap := make(map[uint64]*core.RegionInfo) @@ -1457,12 +1462,18 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me log.Debug("Create balace region op", zap.Uint64("from", op.FromStore), zap.Uint64("to", op.ToStore), zap.Uint64("region_id", rid)) o, err := operator.CreateMovePeerOperator("balance-region", c, regionIDMap[rid], operator.OpReplica, op.FromStore, newPeer) if err != nil { - return "", err + return MigrationResult{ErrorCode: 1, Ops: nil}, err } co.GetOperatorController().AddOperator(o) } } - return "", nil + + result := MigrationResult{ + ErrorCode: 0, + Ops: ops, + } + + return result, nil } // CheckRegionsReplicated checks if regions are replicated. diff --git a/pkg/utils/testutil/api_check.go b/pkg/utils/testutil/api_check.go index 0b714204500..5e44d661c1c 100644 --- a/pkg/utils/testutil/api_check.go +++ b/pkg/utils/testutil/api_check.go @@ -19,8 +19,10 @@ import ( "io" "net/http" + "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/apiutil" + "go.uber.org/zap" ) // Status is used to check whether http response code is equal given code. @@ -45,6 +47,7 @@ func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) { // ExtractJSON is used to check whether given data can be extracted successfully. func ExtractJSON(re *require.Assertions, data any) func([]byte, int, http.Header) { return func(resp []byte, _ int, _ http.Header) { + log.Info("!!!!! ffdfdfdfd", zap.Any("a", string(resp))) re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp)) } } diff --git a/server/api/region.go b/server/api/region.go index a1973254fd2..6c60d7fcd7b 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -134,12 +134,12 @@ func (h *regionsHandler) BalanceRegion(w http.ResponseWriter, r *http.Request) { rawStartKey := vars["startKey"] rawEndKey := vars["endKey"] storeLabels := make([]*metapb.StoreLabel, 0) - state, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey, storeLabels) + result, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey, storeLabels) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return } - h.rd.JSON(w, http.StatusOK, state) + h.rd.JSON(w, http.StatusOK, result) } type regionsHandler struct { diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 4fa028d64bf..2b4f020badb 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" + "github.com/tikv/pd/pkg/schedule/handler" "github.com/tikv/pd/pkg/schedule/operator" "github.com/tikv/pd/pkg/schedule/placement" tu "github.com/tikv/pd/pkg/utils/testutil" @@ -682,6 +683,7 @@ func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) } pauseAllCheckers(re, cluster) + result := handler.MigrationResult{} r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) tests.MustPutRegionInfo(re, cluster, r1) r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) @@ -690,10 +692,9 @@ func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) tests.MustPutRegionInfo(re, cluster, r3) urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) - // err := tu.CheckGetJSON(tests.TestDialClient, fmt.Sprintf("%s/stores", urlPrefix), []byte(``), tu.StringContain(re, "add peer: store [4]")) - // re.NoError(err) - e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re)) + e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result)) re.NoError(e) + re.Equal(2, len(result.Ops)) } func (suite *operatorTestSuite) TestBalanceRegions() { From 9afa987bd2bf462d8943195683f9c9a463018eff Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Wed, 21 Aug 2024 16:36:51 +0800 Subject: [PATCH 6/7] more fix Signed-off-by: Calvin Neo --- pkg/schedule/handler/handler.go | 15 +--- pkg/utils/testutil/api_check.go | 3 - tests/server/api/operator_test.go | 145 ++++++++++++++++++++++++------ 3 files changed, 122 insertions(+), 41 deletions(-) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index bd2afc2973c..c92cd2c67ea 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1297,19 +1297,14 @@ func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *Mig // If toStore doesn't has this region, then create a move op. o.Regions[r] = false o.OriginalPeer = fromStore.OriginalPeer[r] - log.Info("!!!! Pick S", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore), zap.Any("OriginalPeer", fromStore.OriginalPeer[r])) fromStore.RegionIDSet[r] = true n-- - } else { - log.Info("!!!! Pick", zap.Any("r", r), zap.Any("fr", fromStore), zap.Any("to", toStore)) } } return &o } func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { - log.Info("!!! MigrationPlan", - zap.Any("store-id", stores)) totalRegionCount := 0 for _, store := range stores { totalRegionCount += len(store.RegionIDSet) @@ -1335,7 +1330,6 @@ func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) { expectedCount = append(expectedCount, avr) } - log.Info("!!! expectedCount", zap.Any("expectedCount", expectedCount)) senders := []int{} receivers := []int{} sendersVolume := []int{} @@ -1383,7 +1377,7 @@ type MigrationResult struct { } // BalanceRegion checks if regions are imbalanced and rebalance them. -func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*metapb.StoreLabel) (MigrationResult, error) { +func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) { startKey, err := hex.DecodeString(rawStartKey) if err != nil { return MigrationResult{ErrorCode: 1, Ops: nil}, err @@ -1413,12 +1407,11 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me for _, l := range s.GetLabels() { storeLabelMap[l.Key] = l } - if len(storeLabels) != len(storeLabelMap) { + if len(requiredLabels) != len(storeLabelMap) { continue } - log.Info("!!!! store pass 1", zap.Any("s", s), zap.Any("l", s.GetLabels()), zap.Any("id", s.GetID())) gotLabels := true - for _, larg := range storeLabels { + for _, larg := range requiredLabels { if l, ok := storeLabelMap[larg.Key]; ok { if larg.Value != l.Value { gotLabels = false @@ -1433,7 +1426,6 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me if !gotLabels { continue } - log.Info("!!!! store pass 2", zap.Any("s", s)) candidate := &StoreRegionSet{ ID: s.GetID(), Info: s, @@ -1448,7 +1440,6 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, storeLabels []*me } } } - log.Info("!!!! store pass 3", zap.Any("s", s)) candidates = append(candidates, candidate) } diff --git a/pkg/utils/testutil/api_check.go b/pkg/utils/testutil/api_check.go index 5e44d661c1c..0b714204500 100644 --- a/pkg/utils/testutil/api_check.go +++ b/pkg/utils/testutil/api_check.go @@ -19,10 +19,8 @@ import ( "io" "net/http" - "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/tikv/pd/pkg/utils/apiutil" - "go.uber.org/zap" ) // Status is used to check whether http response code is equal given code. @@ -47,7 +45,6 @@ func StatusNotOK(re *require.Assertions) func([]byte, int, http.Header) { // ExtractJSON is used to check whether given data can be extracted successfully. func ExtractJSON(re *require.Assertions, data any) func([]byte, int, http.Header) { return func(resp []byte, _ int, _ http.Header) { - log.Info("!!!!! ffdfdfdfd", zap.Any("a", string(resp))) re.NoError(json.Unmarshal(resp, data), "resp: "+string(resp)) } } diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 2b4f020badb..7a8105f401e 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -18,6 +18,7 @@ import ( "encoding/json" "errors" "fmt" + "slices" "sort" "strconv" "strings" @@ -655,28 +656,90 @@ func (suite *operatorTestSuite) checkRemoveOperators(cluster *tests.TestCluster) re.NoError(err) } -func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) { - re := suite.Require() - stores := []*metapb.Store{ - { - Id: 1, - State: metapb.StoreState_Up, - NodeState: metapb.NodeState_Serving, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 2, - State: metapb.StoreState_Up, - NodeState: metapb.NodeState_Serving, - LastHeartbeat: time.Now().UnixNano(), - }, - { - Id: 4, +type regionStoresPair struct { + RegionId uint64 + StorePos []uint64 +} + +func buildBalanceRegionTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) { + stores := []*metapb.Store{} + regions := []*core.RegionInfo{} + for _, i := range storeIDs { + stores = append(stores, &metapb.Store{ + Id: i, State: metapb.StoreState_Up, NodeState: metapb.NodeState_Serving, LastHeartbeat: time.Now().UnixNano(), - }, + }) + } + + var peerIdAllocator uint64 + peerIdAllocator = 10000 + for _, p := range regionDist { + regionId := p.RegionId + holdingStores := p.StorePos + var peers []*metapb.Peer + for _, storePos := range holdingStores { + s := stores[storePos] + peerIdAllocator += 1 + peers = append(peers, &metapb.Peer{ + StoreId: s.GetId(), + Id: peerIdAllocator, + }) + } + region := core.NewTestRegionInfo(regionId, stores[holdingStores[0]].GetId(), []byte(fmt.Sprintf("r%v", regionId)), []byte(fmt.Sprintf("r%v", regionId+1)), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1), core.SetPeers(peers)) + regions = append(regions, region) + } + + return stores, regions +} + +func validateMigtationOut(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 { + r := make(map[uint64]interface{}) + for _, op := range ops { + for _, sid := range storeIDs { + if op.FromStore == sid { + for k, _ := range op.Regions { + r[k] = nil + } + } + } + } + rl := []uint64{} + for k, _ := range r { + rl = append(rl, k) + } + slices.Sort(rl) + return rl +} + +func validateMigtationIn(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 { + r := make(map[uint64]interface{}) + for _, op := range ops { + for _, sid := range storeIDs { + if op.ToStore == sid { + for k, _ := range op.Regions { + r[k] = nil + } + } + } } + rl := []uint64{} + for k, _ := range r { + rl = append(rl, k) + } + slices.Sort(rl) + return rl +} + +func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) { + re := suite.Require() + + stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + {10, []uint64{0}}, + {20, []uint64{0}}, + {30, []uint64{0}}, + }) for _, store := range stores { tests.MustPutStore(re, cluster, store) @@ -684,12 +747,9 @@ func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) pauseAllCheckers(re, cluster) result := handler.MigrationResult{} - r1 := core.NewTestRegionInfo(10, 1, []byte(""), []byte("b"), core.SetWrittenBytes(1000), core.SetReadBytes(1000), core.SetRegionConfVer(1), core.SetRegionVersion(1)) - tests.MustPutRegionInfo(re, cluster, r1) - r2 := core.NewTestRegionInfo(20, 1, []byte("b"), []byte("c"), core.SetWrittenBytes(2000), core.SetReadBytes(0), core.SetRegionConfVer(2), core.SetRegionVersion(3)) - tests.MustPutRegionInfo(re, cluster, r2) - r3 := core.NewTestRegionInfo(30, 1, []byte("c"), []byte(""), core.SetWrittenBytes(500), core.SetReadBytes(800), core.SetRegionConfVer(3), core.SetRegionVersion(2)) - tests.MustPutRegionInfo(re, cluster, r3) + for _, r := range regions { + tests.MustPutRegionInfo(re, cluster, r) + } urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result)) @@ -697,12 +757,45 @@ func (suite *operatorTestSuite) checkBalanceRegions(cluster *tests.TestCluster) re.Equal(2, len(result.Ops)) } +func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) { + re := suite.Require() + + stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + {10, []uint64{0, 1}}, + {20, []uint64{0, 2}}, + {30, []uint64{0, 1}}, + }) + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + pauseAllCheckers(re, cluster) + result := handler.MigrationResult{} + for _, r := range regions { + tests.MustPutRegionInfo(re, cluster, r) + } + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result)) + re.NoError(e) + re.Equal(1, len(result.Ops)) + validateMigtationOut(result.Ops, []uint64{1}) + validateMigtationIn(result.Ops, []uint64{4}) +} + func (suite *operatorTestSuite) TestBalanceRegions() { - // use a new environment to avoid being affected by other tests + use a new environment to avoid being affected by other tests env := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, _ string) { conf.Replication.MaxReplicas = 1 }) - env.RunTestBasedOnMode(suite.checkBalanceRegions) + env.RunTestBasedOnMode(suite.checkBalanceRegions1) env.Cleanup() + env2 := tests.NewSchedulingTestEnvironment(suite.T(), + func(conf *config.Config, _ string) { + conf.Replication.MaxReplicas = 1 + }) + env2.RunTestBasedOnMode(suite.checkBalanceRegions2) + env2.Cleanup() } From b9e3ca29d3bf80ee7dbcacf3c3186f6f95ef5ae3 Mon Sep 17 00:00:00 2001 From: Calvin Neo Date: Thu, 5 Sep 2024 16:10:59 +0800 Subject: [PATCH 7/7] state Signed-off-by: Calvin Neo --- pkg/schedule/handler/handler.go | 58 ++++++++++--------- server/api/region.go | 4 +- server/api/router.go | 2 +- tests/server/api/operator_test.go | 95 ++++++++++++++++++++++++++++--- 4 files changed, 119 insertions(+), 40 deletions(-) diff --git a/pkg/schedule/handler/handler.go b/pkg/schedule/handler/handler.go index c92cd2c67ea..cbd88bb66b0 100644 --- a/pkg/schedule/handler/handler.go +++ b/pkg/schedule/handler/handler.go @@ -1376,40 +1376,13 @@ type MigrationResult struct { Ops []*MigrationOp `json:"ops"` } -// BalanceRegion checks if regions are imbalanced and rebalance them. -func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) { - startKey, err := hex.DecodeString(rawStartKey) - if err != nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, err - } - endKey, err := hex.DecodeString(rawEndKey) - if err != nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, err - } - c := h.GetCluster() - if c == nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - co := h.GetCoordinator() - if co == nil { - return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() - } - regions := c.ScanRegions(startKey, endKey, -1) - regionIDMap := make(map[uint64]*core.RegionInfo) - for _, r := range regions { - regionIDMap[r.GetID()] = r - } - - stores := c.GetStores() +func ComputeCandidateStores(requiredLabels []*metapb.StoreLabel, stores []*core.StoreInfo, regions []*core.RegionInfo) []*StoreRegionSet { candidates := make([]*StoreRegionSet, 0) for _, s := range stores { storeLabelMap := make(map[string]*metapb.StoreLabel) for _, l := range s.GetLabels() { storeLabelMap[l.Key] = l } - if len(requiredLabels) != len(storeLabelMap) { - continue - } gotLabels := true for _, larg := range requiredLabels { if l, ok := storeLabelMap[larg.Key]; ok { @@ -1442,6 +1415,35 @@ func (h *Handler) BalanceRegion(rawStartKey, rawEndKey string, requiredLabels [] } candidates = append(candidates, candidate) } + return candidates +} + +// RedistibuteRegions checks if regions are imbalanced and rebalance them. +func (h *Handler) RedistibuteRegions(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) { + startKey, err := hex.DecodeString(rawStartKey) + if err != nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, err + } + endKey, err := hex.DecodeString(rawEndKey) + if err != nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, err + } + c := h.GetCluster() + if c == nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + co := h.GetCoordinator() + if co == nil { + return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs() + } + regions := c.ScanRegions(startKey, endKey, -1) + regionIDMap := make(map[uint64]*core.RegionInfo) + for _, r := range regions { + regionIDMap[r.GetID()] = r + } + + stores := c.GetStores() + candidates := ComputeCandidateStores(requiredLabels, stores, regions) senders, receivers, ops := MigrationPlan(candidates) diff --git a/server/api/region.go b/server/api/region.go index 6c60d7fcd7b..d04bfb00c20 100644 --- a/server/api/region.go +++ b/server/api/region.go @@ -129,12 +129,12 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R h.rd.JSON(w, http.StatusOK, state) } -func (h *regionsHandler) BalanceRegion(w http.ResponseWriter, r *http.Request) { +func (h *regionsHandler) RedistibuteRegions(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) rawStartKey := vars["startKey"] rawEndKey := vars["endKey"] storeLabels := make([]*metapb.StoreLabel, 0) - result, err := h.Handler.BalanceRegion(rawStartKey, rawEndKey, storeLabels) + result, err := h.Handler.RedistibuteRegions(rawStartKey, rawEndKey, storeLabels) if err != nil { h.rd.JSON(w, http.StatusBadRequest, err.Error()) return diff --git a/server/api/router.go b/server/api/router.go index e8b12e88cca..9f0d07bdfb5 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -284,7 +284,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(clusterRouter, "/regions/replicated", regionsHandler.CheckRegionsReplicated, setMethods(http.MethodGet), setQueries("startKey", "{startKey}", "endKey", "{endKey}"), setAuditBackend(prometheus)) - registerFunc(clusterRouter, "/regions/balance", regionsHandler.BalanceRegion, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(clusterRouter, "/regions/balance", regionsHandler.RedistibuteRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(apiRouter, "/version", newVersionHandler(rd).GetVersion, setMethods(http.MethodGet), setAuditBackend(prometheus)) registerFunc(apiRouter, "/status", newStatusHandler(svr, rd).GetPDStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/tests/server/api/operator_test.go b/tests/server/api/operator_test.go index 7a8105f401e..9a63dd55345 100644 --- a/tests/server/api/operator_test.go +++ b/tests/server/api/operator_test.go @@ -26,6 +26,8 @@ import ( "time" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/schedule/handler" @@ -661,7 +663,7 @@ type regionStoresPair struct { StorePos []uint64 } -func buildBalanceRegionTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) { +func buildRedistributeRegionsTestCases(storeIDs []uint64, regionDist []regionStoresPair) ([]*metapb.Store, []*core.RegionInfo) { stores := []*metapb.Store{} regions := []*core.RegionInfo{} for _, i := range storeIDs { @@ -732,10 +734,10 @@ func validateMigtationIn(ops []*handler.MigrationOp, storeIDs []uint64) []uint64 return rl } -func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) { +func (suite *operatorTestSuite) checkRedistributeRegions1(cluster *tests.TestCluster) { re := suite.Require() - stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{ {10, []uint64{0}}, {20, []uint64{0}}, {30, []uint64{0}}, @@ -757,10 +759,10 @@ func (suite *operatorTestSuite) checkBalanceRegions1(cluster *tests.TestCluster) re.Equal(2, len(result.Ops)) } -func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) { +func (suite *operatorTestSuite) checkRedistributeRegions2(cluster *tests.TestCluster) { re := suite.Require() - stores, regions := buildBalanceRegionTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{ {10, []uint64{0, 1}}, {20, []uint64{0, 2}}, {30, []uint64{0, 1}}, @@ -784,18 +786,93 @@ func (suite *operatorTestSuite) checkBalanceRegions2(cluster *tests.TestCluster) validateMigtationIn(result.Ops, []uint64{4}) } -func (suite *operatorTestSuite) TestBalanceRegions() { - use a new environment to avoid being affected by other tests +func (suite *operatorTestSuite) TestRedistributeRegions() { env := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, _ string) { conf.Replication.MaxReplicas = 1 }) - env.RunTestBasedOnMode(suite.checkBalanceRegions1) + env.RunTestBasedOnMode(suite.checkRedistributeRegions1) env.Cleanup() env2 := tests.NewSchedulingTestEnvironment(suite.T(), func(conf *config.Config, _ string) { conf.Replication.MaxReplicas = 1 }) - env2.RunTestBasedOnMode(suite.checkBalanceRegions2) + env2.RunTestBasedOnMode(suite.checkRedistributeRegions2) env2.Cleanup() } + +func (suite *operatorTestSuite) checkRedistributeRegions3(cluster *tests.TestCluster) { + re := suite.Require() + + stores, regions := buildRedistributeRegionsTestCases([]uint64{1, 2, 4}, []regionStoresPair{ + {10, []uint64{0, 1}}, + {20, []uint64{0, 2}}, + {30, []uint64{0, 1}}, + }) + + for _, store := range stores { + tests.MustPutStore(re, cluster, store) + } + + pauseAllCheckers(re, cluster) + result := handler.MigrationResult{} + for _, r := range regions { + tests.MustPutRegionInfo(re, cluster, r) + } + + urlPrefix := fmt.Sprintf("%s/pd/api/v1", cluster.GetLeaderServer().GetAddr()) + e := tu.CheckPostJSON(tests.TestDialClient, fmt.Sprintf("%s/regions/balance", urlPrefix), []byte(``), tu.StatusOK(re), tu.ExtractJSON(re, &result)) + re.NoError(e) + re.Equal(1, len(result.Ops)) + validateMigtationOut(result.Ops, []uint64{1}) + validateMigtationIn(result.Ops, []uint64{4}) +} + +func TestComputeCandidateStores(t *testing.T) { + re := require.New(t) + stores := []*core.StoreInfo{} + + stats := &pdpb.StoreStats{ + Capacity: 100, + Available: 100, + } + stores = append(stores, core.NewStoreInfo( + &metapb.Store{ + Id: 1, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}}, + LastHeartbeat: time.Now().UnixNano(), + }, + core.SetStoreStats(stats), + core.SetLastHeartbeatTS(time.Now()), + )) + stores = append(stores, core.NewStoreInfo( + &metapb.Store{ + Id: 2, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "engine", Value: "tiflash"}}, + LastHeartbeat: time.Now().UnixNano(), + }, + core.SetStoreStats(stats), + core.SetLastHeartbeatTS(time.Now()), + )) + stores = append(stores, core.NewStoreInfo( + &metapb.Store{ + Id: 3, + State: metapb.StoreState_Up, + NodeState: metapb.NodeState_Serving, + Labels: []*metapb.StoreLabel{{Key: "zone", Value: "z2"}}, + LastHeartbeat: time.Now().UnixNano(), + }, + core.SetStoreStats(stats), + core.SetLastHeartbeatTS(time.Now()), + )) + + regions := []*core.RegionInfo{} + + re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z1"}}, stores, regions), 2, "case 1") + re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z2"}}, stores, regions), 1, "case 1") + re.Len(handler.ComputeCandidateStores([]*metapb.StoreLabel{{Key: "zone", Value: "z1"}, {Key: "engine", Value: "tiflash"}}, stores, regions), 1, "case 1") +}