Skip to content

Commit

Permalink
Merge branch 'release-5.0-rc' into release-5.0-rc-65e08e2b7de7
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Jan 4, 2021
2 parents c47efdd + 27c29b7 commit b95c001
Show file tree
Hide file tree
Showing 22 changed files with 168 additions and 418 deletions.
27 changes: 7 additions & 20 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,52 +324,39 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
if len(filledNums) > 0 {
filledNum = filledNums[0]
}

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
num := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
for i := 0; i < num; i++ {
items := mc.HotCache.CheckRead(r)
for _, item := range items {
mc.HotCache.Update(item)
}
}
mc.PutRegion(r)
return items
}

// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderID uint64,
writtenBytes, writtenKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat {
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))

filledNum := mc.HotCache.GetFilledPeriod(statistics.WriteFlow)
if len(filledNums) > 0 {
filledNum = filledNums[0]
}

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckWrite(r)
num := mc.HotCache.GetFilledPeriod(statistics.WriteFlow)
for i := 0; i < num; i++ {
items := mc.HotCache.CheckWrite(r)
for _, item := range items {
mc.HotCache.Update(item)
}
}
mc.PutRegion(r)
return items
}

// UpdateStoreLeaderWeight updates store leader weight.
Expand Down
4 changes: 2 additions & 2 deletions pkg/movingaverage/avg_over_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,14 @@ func (t *testAvgOverTimeSuite) TestChange(c *C) {
}

func (t *testAvgOverTimeSuite) TestMinFilled(c *C) {
interval := 10 * time.Second
interval := 10
rate := 1.0
for aotSize := 2; aotSize < 10; aotSize++ {
for mfSize := 2; mfSize < 10; mfSize++ {
tm := NewTimeMedian(aotSize, mfSize, interval)
for i := 0; i < tm.GetFilledPeriod(); i++ {
c.Assert(tm.Get(), Equals, 0.0)
tm.Add(rate*interval.Seconds(), interval)
tm.Add(rate*float64(interval), time.Duration(interval)*time.Second)
}
c.Assert(tm.Get(), Equals, rate)
}
Expand Down
7 changes: 4 additions & 3 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ type TimeMedian struct {
}

// NewTimeMedian returns a TimeMedian with given size.
func NewTimeMedian(aotSize, mfSize int, reportInterval time.Duration) *TimeMedian {
func NewTimeMedian(aotSize, mfSize, reportInterval int) *TimeMedian {
interval := time.Duration(aotSize*reportInterval) * time.Second
return &TimeMedian{
aotInterval: reportInterval,
aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval),
aotInterval: interval,
aot: NewAvgOverTime(interval),
mf: NewMedianFilter(mfSize),
aotSize: aotSize,
mfSize: mfSize,
Expand Down
2 changes: 1 addition & 1 deletion server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ const (
defaultSchedulerMaxWaitingOperator = 5
defaultLeaderSchedulePolicy = "count"
defaultStoreLimitMode = "manual"
defaultEnableJointConsensus = true
defaultEnableJointConsensus = false
defaultEnableCrossTableMerge = true
)

Expand Down
7 changes: 7 additions & 0 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,13 @@ func (o *PersistOptions) IsUseJointConsensus() bool {
return o.GetScheduleConfig().EnableJointConsensus
}

// SetEnableJointConsensus sets whether to enable joint-consensus. It's only used to test.
func (o *PersistOptions) SetEnableJointConsensus(enableJointConsensus bool) {
v := o.GetScheduleConfig().Clone()
v.EnableJointConsensus = enableJointConsensus
o.SetScheduleConfig(v)
}

// GetHotRegionCacheHitsThreshold is a threshold to decide if a region is hot.
func (o *PersistOptions) GetHotRegionCacheHitsThreshold() int {
return int(o.GetScheduleConfig().HotRegionCacheHitsThreshold)
Expand Down
2 changes: 1 addition & 1 deletion server/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (c *RuleChecker) Check(region *core.RegionInfo) *operator.Operator {
op, err := c.fixRulePeer(region, fit, rf)
if err != nil {
log.Debug("fail to fix rule peer", zap.String("rule-group", rf.Rule.GroupID), zap.String("rule-id", rf.Rule.ID), errs.ZapError(err))
break
continue
}
if op != nil {
return op
Expand Down
37 changes: 37 additions & 0 deletions server/schedule/checker/rule_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,40 @@ func (s *testRuleCheckerSuite) TestIssue2419(c *C) {
c.Assert(op.Step(1).(operator.PromoteLearner).ToStore, Equals, uint64(4))
c.Assert(op.Step(2).(operator.RemovePeer).FromStore, Equals, uint64(3))
}

func (s *testRuleCheckerSuite) TestIssue3293(c *C) {
s.cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
s.cluster.AddLabelsStore(2, 1, map[string]string{"host": "host1"})
s.cluster.AddLabelsStore(3, 1, map[string]string{"host": "host2"})
s.cluster.AddLabelsStore(4, 1, map[string]string{"host": "host4"})
s.cluster.AddLabelsStore(5, 1, map[string]string{"host": "host5"})
s.cluster.AddLeaderRegionWithRange(1, "", "", 1, 2)
err := s.ruleManager.SetRule(&placement.Rule{
GroupID: "TiDB_DDL_51",
ID: "0",
Role: placement.Follower,
Count: 1,
LabelConstraints: []placement.LabelConstraint{
{
Key: "dc",
Values: []string{
"sh",
},
Op: placement.In,
},
},
})
c.Assert(err, IsNil)
err = s.ruleManager.SetRule(&placement.Rule{
GroupID: "TiDB_DDL_51",
ID: "default",
Role: placement.Voter,
Count: 3,
})
c.Assert(err, IsNil)
err = s.ruleManager.DeleteRule("pd", "default")
c.Assert(err, IsNil)
op := s.rc.Check(s.cluster.GetRegion(1))
c.Assert(op, NotNil)
c.Assert(op.Desc(), Equals, "add-rule-peer")
}
1 change: 1 addition & 0 deletions server/schedule/operator/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type testBuilderSuite struct {

func (s *testBuilderSuite) SetUpTest(c *C) {
opts := config.NewTestOptions()
opts.SetEnableJointConsensus(true)
s.cluster = mockcluster.NewCluster(opts)
s.cluster.SetLabelPropertyConfig(config.LabelPropertyConfig{
opt.RejectLeader: {{Key: "noleader", Value: "true"}},
Expand Down
1 change: 1 addition & 0 deletions server/schedule/operator/create_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type testCreateOperatorSuite struct {

func (s *testCreateOperatorSuite) SetUpTest(c *C) {
opts := config.NewTestOptions()
opts.SetEnableJointConsensus(true)
s.cluster = mockcluster.NewCluster(opts)
s.cluster.SetLabelPropertyConfig(config.LabelPropertyConfig{
opt.RejectLeader: {{Key: "noleader", Value: "true"}},
Expand Down
5 changes: 4 additions & 1 deletion server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,6 +573,7 @@ func (t *testOperatorControllerSuite) TestDispatchUnfinishedStep(c *C) {

func (t *testOperatorControllerSuite) TestStoreLimitWithMerge(c *C) {
cfg := config.NewTestOptions()
cfg.SetEnableJointConsensus(true)
tc := mockcluster.NewCluster(cfg)
tc.SetMaxMergeRegionSize(2)
tc.SetMaxMergeRegionKeys(2)
Expand Down Expand Up @@ -663,7 +664,9 @@ func (t *testOperatorControllerSuite) TestAddWaitingOperator(c *C) {
cluster := mockcluster.NewCluster(config.NewTestOptions())
stream := hbstream.NewTestHeartbeatStreams(t.ctx, cluster.ID, cluster, false /* no need to run */)
controller := NewOperatorController(t.ctx, cluster, stream)

cluster.AddLabelsStore(1, 1, map[string]string{"host": "host1"})
cluster.AddLabelsStore(2, 1, map[string]string{"host": "host2"})
cluster.AddLabelsStore(3, 1, map[string]string{"host": "host3"})
addPeerOp := func(i uint64) *operator.Operator {
start := fmt.Sprintf("%da", i)
end := fmt.Sprintf("%db", i)
Expand Down
23 changes: 14 additions & 9 deletions server/schedule/placement/fit.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func FitRegion(stores StoreSet, region *core.RegionInfo, rules []*Rule) *RegionF
}

type fitWorker struct {
stores []*core.StoreInfo
bestFit RegionFit // update during execution
peers []*fitPeer // p.selected is updated during execution.
rules []*Rule
Expand All @@ -146,6 +147,7 @@ func newFitWorker(stores StoreSet, region *core.RegionInfo, rules []*Rule) *fitW
sort.Slice(peers, func(i, j int) bool { return peers[i].GetId() < peers[j].GetId() })

return &fitWorker{
stores: stores.GetStores(),
bestFit: RegionFit{RuleFits: make([]*RuleFit, len(rules))},
peers: peers,
rules: rules,
Expand All @@ -164,16 +166,19 @@ func (w *fitWorker) fitRule(index int) bool {
if index >= len(w.rules) {
return false
}
// Only consider stores:
// 1. Match label constraints
// 2. Role match, or can match after transformed.
// 3. Not selected by other rules.

var candidates []*fitPeer
for _, p := range w.peers {
if MatchLabelConstraints(p.store, w.rules[index].LabelConstraints) &&
p.matchRoleLoose(w.rules[index].Role) &&
!p.selected {
candidates = append(candidates, p)
if checkRule(w.rules[index], w.stores) {
// Only consider stores:
// 1. Match label constraints
// 2. Role match, or can match after transformed.
// 3. Not selected by other rules.
for _, p := range w.peers {
if MatchLabelConstraints(p.store, w.rules[index].LabelConstraints) &&
p.matchRoleLoose(w.rules[index].Role) &&
!p.selected {
candidates = append(candidates, p)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion server/schedule/placement/rule_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (rl ruleList) getRulesForApplyRegion(start, end []byte) []*Rule {
i := sort.Search(len(rl.ranges), func(i int) bool {
return bytes.Compare(rl.ranges[i].startKey, start) > 0
})
if i != len(rl.ranges) && (len(end) == 0 || bytes.Compare(end, rl.ranges[i].startKey) > 0) {
if i == 0 || i != len(rl.ranges) && (len(end) == 0 || bytes.Compare(end, rl.ranges[i].startKey) > 0) {
return nil
}
return rl.ranges[i-1].applyRules
Expand Down
11 changes: 11 additions & 0 deletions server/schedule/placement/rule_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,3 +614,14 @@ func (m *RuleManager) IsInitialized() bool {
defer m.RUnlock()
return m.initialized
}

// checkRule check the rule whether will have RuleFit after FitRegion
// in order to reduce the calculation.
func checkRule(rule *Rule, stores []*core.StoreInfo) bool {
for _, store := range stores {
if MatchLabelConstraints(store, rule.LabelConstraints) {
return true
}
}
return false
}
8 changes: 4 additions & 4 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ func (h *hotScheduler) addPendingInfluence(op *operator.Operator, srcStore, dstS
h.regionPendings[regionID] = tmp
}

schedulerStatus.WithLabelValues(h.GetName(), "pending_op_infos").Inc()
schedulerStatus.WithLabelValues(h.GetName(), "pending_op_create").Inc()
return true
}

Expand Down Expand Up @@ -806,13 +806,13 @@ func (bs *balanceSolver) calcProgressiveRank() {
greatDecRatio, minorDecRatio := bs.sche.conf.GetGreatDecRatio(), bs.sche.conf.GetMinorGreatDecRatio()
switch {
case byteHot && byteDecRatio <= greatDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// If belong to the case, both byte rate and key rate will be more balanced, the best choice.
// Both byte rate and key rate are balanced, the best choice.
rank = -3
case byteDecRatio <= minorDecRatio && keyHot && keyDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be not worsened, key rate will be more balanced.
// Byte rate is not worsened, key rate is balanced.
rank = -2
case byteHot && byteDecRatio <= greatDecRatio:
// If belong to the case, byte rate will be more balanced, ignore the key rate.
// Byte rate is balanced, ignore the key rate.
rank = -1
}
}
Expand Down
73 changes: 0 additions & 73 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,76 +1060,3 @@ func addRegionInfo(tc *mockcluster.Cluster, rwTy rwType, regions []testRegionInf
)
}
}

func (s *testHotCacheSuite) TestCheckRegionFlow(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
s.checkRegionFlowTest(c, tc.AddLeaderRegionWithWriteInfo)
s.checkRegionFlowTest(c, tc.AddLeaderRegionWithReadInfo)
}

func (s *testHotCacheSuite) checkRegionFlowTest(c *C, heartbeat func(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64, filledNums ...int) []*statistics.HotPeerStat) {
// hot degree increase
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
items := heartbeat(1, 1, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
}

// transfer leader and skip the first heartbeat
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3}, 1)
for _, item := range items {
c.Check(item.HotDegree, Equals, 3)
}

// move peer: add peer and remove peer
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 3, 4}, 1)
for _, item := range items {
c.Check(item.HotDegree, Equals, 4)
}
items = heartbeat(1, 2, 512*KB*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{1, 4}, 1)
for _, item := range items {
if item.StoreID == 3 {
c.Check(item.IsNeedDelete(), IsTrue)
continue
}
c.Check(item.HotDegree, Equals, 5)
}
}

func (s *testHotCacheSuite) TestCheckRegionFlowWithDifferentThreshold(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
tc.SetMaxReplicas(3)
tc.SetLocationLabels([]string{"zone", "host"})
tc.DisableFeature(versioninfo.JointConsensus)
// some peers are hot, and some are cold #3198
rate := uint64(512 * KB)
for i := 0; i < statistics.TopNN; i++ {
for j := 0; j < statistics.DefaultAotSize; j++ {
tc.AddLeaderRegionWithWriteInfo(uint64(i+100), 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
}
}
items := tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3}, 1)
c.Check(items[0].GetThresholds()[0], Equals, float64(rate)*statistics.HotThresholdRatio)
// Threshold of store 1,2,3 is 409.6 KB and others are 1 KB
// Make the hot threshold of some store is high and the others are low
rate = 10 * KB
tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{2, 3, 4}, 1)
items = tc.AddLeaderRegionWithWriteInfo(201, 1, rate*statistics.RegionHeartBeatReportInterval, 0, statistics.RegionHeartBeatReportInterval, []uint64{3, 4}, 1)
for _, item := range items {
if item.StoreID < 4 {
c.Check(item.IsNeedDelete(), IsTrue)
} else {
c.Check(item.IsNeedDelete(), IsFalse)
}
}
}
10 changes: 4 additions & 6 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,16 +334,14 @@ func (li *storeLoadDetail) toHotPeersStat() *statistics.HotPeersStat {
peers := make([]statistics.HotPeerStat, 0, len(li.HotPeers))
var totalBytesRate, totalKeysRate float64
for _, peer := range li.HotPeers {
if peer.HotDegree > 0 {
peers = append(peers, *peer.Clone())
totalBytesRate += peer.ByteRate
totalKeysRate += peer.KeyRate
}
peers = append(peers, *peer.Clone())
totalBytesRate += peer.ByteRate
totalKeysRate += peer.KeyRate
}
return &statistics.HotPeersStat{
TotalBytesRate: math.Round(totalBytesRate),
TotalKeysRate: math.Round(totalKeysRate),
Count: len(peers),
Count: len(li.HotPeers),
Stats: peers,
}
}
Loading

0 comments on commit b95c001

Please sign in to comment.