Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DNM Support BalaceRegion #8497

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -243,12 +243,14 @@ SUBMODULES := $(filter $(shell find . -iname "go.mod" -exec dirname {} \;),\
test: install-tools
# testing all pkgs...
@$(FAILPOINT_ENABLE)
echo ${TEST_PKGS}
CGO_ENABLED=1 go test -tags tso_function_test,deadlock -timeout 20m -race -cover $(TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

basic-test: install-tools
# testing basic pkgs...
@$(FAILPOINT_ENABLE)
echo ${TEST_PKGS}
go test $(BASIC_TEST_PKGS) || { $(FAILPOINT_DISABLE); exit 1; }
@$(FAILPOINT_DISABLE)

Expand Down
208 changes: 208 additions & 0 deletions pkg/schedule/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ package handler

import (
"bytes"
"cmp"
"context"
"encoding/hex"
"fmt"
"net/http"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -1261,6 +1264,211 @@ func (h *Handler) SplitRegions(ctx context.Context, rawSplitKeys []any, retryLim
return s, nil
}

type StoreRegionSet struct {
ID uint64
Info *core.StoreInfo
RegionIDSet map[uint64]bool
OriginalPeer map[uint64]*metapb.Peer
}

type MigrationOp struct {
FromStore uint64 `json:"from_store"`
ToStore uint64 `json:"to_store"`
ToStoreInfo *core.StoreInfo `json:"to_store_info"`
OriginalPeer *metapb.Peer `json:"original_peer"`
Regions map[uint64]bool `json:"regions"`
}

func PickRegions(n int, fromStore *StoreRegionSet, toStore *StoreRegionSet) *MigrationOp {
o := MigrationOp{
FromStore: fromStore.ID,
ToStore: toStore.ID,
ToStoreInfo: toStore.Info,
Regions: make(map[uint64]bool),
}
for r, removed := range fromStore.RegionIDSet {
if n == 0 {
break
}
if removed {
continue
}
if _, exist := toStore.RegionIDSet[r]; !exist {
// If toStore doesn't has this region, then create a move op.
o.Regions[r] = false
o.OriginalPeer = fromStore.OriginalPeer[r]
fromStore.RegionIDSet[r] = true
n--
}
}
return &o
}

func MigrationPlan(stores []*StoreRegionSet) ([]int, []int, []*MigrationOp) {
totalRegionCount := 0
for _, store := range stores {
totalRegionCount += len(store.RegionIDSet)
}
for _, store := range stores {
percentage := 100 * float64(len(store.RegionIDSet)) / float64(totalRegionCount)
log.Info("!!! store region dist",
zap.Uint64("store-id", store.ID),
zap.Int("num-region", len(store.RegionIDSet)),
zap.String("percentage", fmt.Sprintf("%.2f%%", percentage)))
}
avr := totalRegionCount / len(stores)
remainder := totalRegionCount % len(stores)
// sort TiFlash stores by region count in descending order
slices.SortStableFunc(stores, func(lhs, rhs *StoreRegionSet) int {
return -cmp.Compare(len(lhs.RegionIDSet), len(rhs.RegionIDSet))
})
expectedCount := []int{}
for i := 0; i < remainder; i++ {
expectedCount = append(expectedCount, avr+1)
}
for i := remainder; i < len(stores); i++ {
expectedCount = append(expectedCount, avr)
}

senders := []int{}
receivers := []int{}
sendersVolume := []int{}
receiversVolume := []int{}
for i, store := range stores {
if len(store.RegionIDSet) < expectedCount[i] {
receivers = append(receivers, i)
receiversVolume = append(receiversVolume, expectedCount[i]-len(store.RegionIDSet))
}
if len(store.RegionIDSet) > expectedCount[i] {
senders = append(senders, i)
sendersVolume = append(sendersVolume, len(store.RegionIDSet)-expectedCount[i])
}
}

ops := []*MigrationOp{}

for i, senderIndex := range senders {
fromStore := stores[senderIndex]
for {
if sendersVolume[i] <= 0 {
break
}
for j, receiverIndex := range receivers {
toStore := stores[receiverIndex]
if receiversVolume[j] > 0 {
n := sendersVolume[i]
if n > receiversVolume[j] {
n = receiversVolume[j]
}
receiversVolume[j] -= n
sendersVolume[i] -= n
ops = append(ops, PickRegions(n, fromStore, toStore))
}
}
}
}

return senders, receivers, ops
}

type MigrationResult struct {
ErrorCode uint64 `json:"error_code"`
Ops []*MigrationOp `json:"ops"`
}

func ComputeCandidateStores(requiredLabels []*metapb.StoreLabel, stores []*core.StoreInfo, regions []*core.RegionInfo) []*StoreRegionSet {
candidates := make([]*StoreRegionSet, 0)
for _, s := range stores {
storeLabelMap := make(map[string]*metapb.StoreLabel)
for _, l := range s.GetLabels() {
storeLabelMap[l.Key] = l
}
gotLabels := true
for _, larg := range requiredLabels {
if l, ok := storeLabelMap[larg.Key]; ok {
if larg.Value != l.Value {
gotLabels = false
break
}
} else {
gotLabels = false
break
}
}

if !gotLabels {
continue
}
candidate := &StoreRegionSet{
ID: s.GetID(),
Info: s,
RegionIDSet: make(map[uint64]bool),
OriginalPeer: make(map[uint64]*metapb.Peer),
}
for _, r := range regions {
for _, p := range r.GetPeers() {
if p.StoreId == s.GetID() {
candidate.RegionIDSet[r.GetID()] = false
candidate.OriginalPeer[r.GetID()] = p
}
}
}
candidates = append(candidates, candidate)
}
return candidates
}

// RedistibuteRegions checks if regions are imbalanced and rebalance them.
func (h *Handler) RedistibuteRegions(rawStartKey, rawEndKey string, requiredLabels []*metapb.StoreLabel) (MigrationResult, error) {
startKey, err := hex.DecodeString(rawStartKey)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
}
endKey, err := hex.DecodeString(rawEndKey)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
}
c := h.GetCluster()
if c == nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
co := h.GetCoordinator()
if co == nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, errs.ErrNotBootstrapped.GenWithStackByArgs()
}
regions := c.ScanRegions(startKey, endKey, -1)
regionIDMap := make(map[uint64]*core.RegionInfo)
for _, r := range regions {
regionIDMap[r.GetID()] = r
}

stores := c.GetStores()
candidates := ComputeCandidateStores(requiredLabels, stores, regions)

senders, receivers, ops := MigrationPlan(candidates)

log.Info("Migration plan details", zap.Any("senders", senders), zap.Any("receivers", receivers), zap.Any("ops", ops))

for _, op := range ops {
for rid := range op.Regions {
newPeer := &metapb.Peer{StoreId: op.ToStore, Role: op.OriginalPeer.Role, IsWitness: op.OriginalPeer.IsWitness}
log.Debug("Create balace region op", zap.Uint64("from", op.FromStore), zap.Uint64("to", op.ToStore), zap.Uint64("region_id", rid))
o, err := operator.CreateMovePeerOperator("balance-region", c, regionIDMap[rid], operator.OpReplica, op.FromStore, newPeer)
if err != nil {
return MigrationResult{ErrorCode: 1, Ops: nil}, err
}
co.GetOperatorController().AddOperator(o)
}
}

result := MigrationResult{
ErrorCode: 0,
Ops: ops,
}

return result, nil
}

// CheckRegionsReplicated checks if regions are replicated.
func (h *Handler) CheckRegionsReplicated(rawStartKey, rawEndKey string) (string, error) {
startKey, err := hex.DecodeString(rawStartKey)
Expand Down
14 changes: 14 additions & 0 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/keyspace"
Expand Down Expand Up @@ -128,6 +129,19 @@ func (h *regionsHandler) CheckRegionsReplicated(w http.ResponseWriter, r *http.R
h.rd.JSON(w, http.StatusOK, state)
}

func (h *regionsHandler) RedistibuteRegions(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
rawStartKey := vars["startKey"]
rawEndKey := vars["endKey"]
storeLabels := make([]*metapb.StoreLabel, 0)
result, err := h.Handler.RedistibuteRegions(rawStartKey, rawEndKey, storeLabels)
if err != nil {
h.rd.JSON(w, http.StatusBadRequest, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, result)
}

type regionsHandler struct {
*server.Handler
svr *server.Server
Expand Down
1 change: 1 addition & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
registerFunc(clusterRouter, "/regions/split", regionsHandler.SplitRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/regions/range-holes", regionsHandler.GetRangeHoles, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/replicated", regionsHandler.CheckRegionsReplicated, setMethods(http.MethodGet), setQueries("startKey", "{startKey}", "endKey", "{endKey}"), setAuditBackend(prometheus))
registerFunc(clusterRouter, "/regions/balance", regionsHandler.RedistibuteRegions, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))

registerFunc(apiRouter, "/version", newVersionHandler(rd).GetVersion, setMethods(http.MethodGet), setAuditBackend(prometheus))
registerFunc(apiRouter, "/status", newStatusHandler(svr, rd).GetPDStatus, setMethods(http.MethodGet), setAuditBackend(prometheus))
Expand Down
Loading
Loading