Skip to content

Commit

Permalink
*: some fixes for release 3.0 (#1624)
Browse files Browse the repository at this point in the history
* schedulers: balance region consider pending peer (#1617)

* schedulers: balance region consider pending peers

Signed-off-by: nolouch <nolouch@gmail.com>

* scheduler: check replica for hot region  (#1609)

* check replica for hot region scheduler

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* schedule: set influence according to region size (#1613)

* not consider store limit when executing merge

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* set influence according to region size

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* fix race problems

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* config: turn the default limit of the hot region schedule (#1616)

* config: turn the default limit of the hot region schedule

Signed-off-by: nolouch <nolouch@gmail.com>

* update CHANGELOG

Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx authored and nolouch committed Jul 11, 2019
1 parent 0f2c85e commit ee98bf9
Show file tree
Hide file tree
Showing 21 changed files with 279 additions and 45 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
+ Fix the issue about the limit of the hot region [#1552](https://github.com/pingcap/pd/pull/1552)
+ Add a option about grpc gateway [#1596](https://github.com/pingcap/pd/pull/1596)
+ Add the missing schedule config items [#1601](https://github.com/pingcap/pd/pull/1601)
+ Fix the issue about checking the number of replicas before scheduling for hot region scheduler [#1609](https://github.com/pingcap/pd/pull/1609)
+ Set influence for the operator according to the region size [#1613](https://github.com/pingcap/pd/pull/1613)
+ Enlarge the default limit of the hot region scheduler [#1616](https://github.com/pingcap/pd/pull/1616)
+ Fix the issue about ignoring the pending peer when balancing regions [#1617](https://github.com/pingcap/pd/pull/1617)

## v3.0.0

Expand Down
1 change: 1 addition & 0 deletions conf/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ leader-schedule-limit = 4
region-schedule-limit = 64
replica-schedule-limit = 64
merge-schedule-limit = 8
hot-region-schedule-limit = 4
#enable-one-way-merge = false
#tolerant-size-ratio = 0.0

Expand Down
6 changes: 3 additions & 3 deletions pkg/mock/mockoption/mockoption.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ const (
defaultSplitMergeInterval = 0
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 4
defaultReplicaScheduleLimit = 8
defaultRegionScheduleLimit = 64
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 60
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
Expand Down
2 changes: 1 addition & 1 deletion server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,8 +379,8 @@ func (c *RaftCluster) GetAdjacentRegions(region *core.RegionInfo) (*core.RegionI
// UpdateStoreLabels updates a store's location labels.
func (c *RaftCluster) UpdateStoreLabels(storeID uint64, labels []*metapb.StoreLabel) error {
c.RLock()
defer c.RUnlock()
store := c.cachedCluster.GetStore(storeID)
c.RUnlock()
if store == nil {
return errors.Errorf("invalid store ID %d, not found", storeID)
}
Expand Down
7 changes: 7 additions & 0 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,13 @@ func (c *clusterInfo) RandFollowerRegion(storeID uint64, opts ...core.RegionOpti
return c.core.RandFollowerRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *clusterInfo) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
c.RLock()
defer c.RUnlock()
return c.core.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (c *clusterInfo) GetAverageRegionSize() int64 {
c.RLock()
Expand Down
4 changes: 2 additions & 2 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,11 +576,11 @@ const (
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 8
defaultLeaderScheduleLimit = 4
defaultRegionScheduleLimit = 64
defaultReplicaScheduleLimit = 64
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultHotRegionScheduleLimit = 4
defaultStoreBalanceRate = 15
defaultTolerantSizeRatio = 0
defaultLowSpaceRatio = 0.8
Expand Down
5 changes: 5 additions & 0 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *
return bc.Regions.RandLeaderRegion(storeID, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return bc.Regions.RandPendingRegion(storeID, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
func (bc *BasicCluster) GetAverageRegionSize() int64 {
return bc.Regions.GetAverageRegionSize()
Expand Down
9 changes: 7 additions & 2 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,12 +668,17 @@ func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo {
return randRegion(r.regions, opts...)
}

// RandLeaderRegion get a store's leader region by random
// RandPendingRegion randomly gets a store's region with a pending peer.
func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], opts...)
}

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID], opts...)
}

// RandFollowerRegion get a store's follower region by random
// RandFollowerRegion randomly gets a store's follower region.
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.followers[storeID], opts...)
}
Expand Down
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ func HealthRegion() RegionOption {
}
}

// HealthRegionAllowPending checks if the region is healthy with allowing the pending peer.
func HealthRegionAllowPending() RegionOption {
return func(region *RegionInfo) bool {
return len(region.downPeers) == 0 && len(region.learners) == 0
}
}

// RegionCreateOption used to create region.
type RegionCreateOption func(region *RegionInfo)

Expand Down
14 changes: 9 additions & 5 deletions server/schedule/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,15 @@ func (f StoreStateFilter) FilterTarget(opt Options, store *core.StoreInfo) bool
return true
}

if f.MoveRegion && f.filterMoveRegion(opt, store) {
return true
if f.MoveRegion {
// only target consider the pending peers because pending more means the disk is slower.
if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}

if f.filterMoveRegion(opt, store) {
return true
}
}
return false
}
Expand All @@ -430,9 +437,6 @@ func (f StoreStateFilter) filterMoveRegion(opt Options, store *core.StoreInfo) b
return true
}

if opt.GetMaxPendingPeerCount() > 0 && store.GetPendingPeerCount() > int(opt.GetMaxPendingPeerCount()) {
return true
}
if uint64(store.GetSendingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetReceivingSnapCount()) > opt.GetMaxSnapshotCount() ||
uint64(store.GetApplyingSnapCount()) > opt.GetMaxSnapshotCount() {
Expand Down
45 changes: 45 additions & 0 deletions server/schedule/merge_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
package schedule

import (
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pkg/mock/mockcluster"
"github.com/pingcap/pd/pkg/mock/mockhbstream"
"github.com/pingcap/pd/pkg/mock/mockoption"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
)

func TestChecker(t *testing.T) {
TestingT(t)
}

var _ = Suite(&testMergeCheckerSuite{})

type testMergeCheckerSuite struct {
Expand Down Expand Up @@ -257,3 +263,42 @@ func (s *testMergeCheckerSuite) TestMatchPeers(c *C) {
},
})
}

func (s *testMergeCheckerSuite) TestStorelimit(c *C) {
oc := NewOperatorController(s.cluster, mockhbstream.NewHeartbeatStream())
s.cluster.ScheduleOptions.SplitMergeInterval = time.Hour
s.cluster.ScheduleOptions.StoreBalanceRate = 60
s.regions[2] = s.regions[2].Clone(
core.SetPeers([]*metapb.Peer{
{Id: 109, StoreId: 2},
{Id: 110, StoreId: 3},
{Id: 111, StoreId: 6},
}),
core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}),
)
s.cluster.PutRegion(s.regions[2])
ops := s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is less or equal than 1MB.
for i := 0; i < 50; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
s.regions[2] = s.regions[2].Clone(
core.SetApproximateSize(2),
core.SetApproximateKeys(2),
)
s.cluster.PutRegion(s.regions[2])
ops = s.mc.Check(s.regions[2])
c.Assert(ops, NotNil)
// The size of Region is more than 1MB but no more than 20MB.
for i := 0; i < 5; i++ {
c.Assert(oc.AddOperator(ops...), IsTrue)
for _, op := range ops {
oc.RemoveOperator(op)
}
}
c.Assert(oc.AddOperator(ops...), IsFalse)
}
23 changes: 19 additions & 4 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ const (
RegionOperatorWaitTime = 10 * time.Minute
// RegionInfluence represents the influence of a operator step, which is used by ratelimit.
RegionInfluence int64 = 1000
// smallRegionInfluence represents the influence of a operator step
// when the region size is smaller than smallRegionThreshold, which is used by ratelimit.
smallRegionInfluence int64 = 200
// smallRegionThreshold is used to represent a region which can be regarded as a small region once the size is small than it.
smallRegionThreshold int64 = 20
)

// OperatorStep describes the basic scheduling steps that can not be subdivided.
Expand Down Expand Up @@ -98,9 +103,14 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool {
func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(ap.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// AddLearner is an OperatorStep that adds a region learner peer.
Expand Down Expand Up @@ -128,9 +138,14 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool {
func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(al.ToStore)

to.RegionSize += region.GetApproximateSize()
regionSize := region.GetApproximateSize()
to.RegionSize += regionSize
to.RegionCount++
to.StepCost += RegionInfluence
if regionSize > smallRegionThreshold {
to.StepCost += RegionInfluence
} else if regionSize <= smallRegionThreshold && regionSize > core.EmptyRegionApproximateSize {
to.StepCost += smallRegionInfluence
}
}

// PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter.
Expand Down
34 changes: 17 additions & 17 deletions server/schedule/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *testOperatorSuite) newTestRegion(regionID uint64, leaderPeer uint64, pe
leader = peer
}
}
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(10), core.SetApproximateKeys(10))
regionInfo := core.NewRegionInfo(&region, leader, core.SetApproximateSize(50), core.SetApproximateKeys(50))
return regionInfo
}

Expand Down Expand Up @@ -124,71 +124,71 @@ func (s *testOperatorSuite) TestInfluence(c *C) {
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 0,
LeaderCount: 0,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

TransferLeader{FromStore: 1, ToStore: 2}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: 0,
RegionCount: 0,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

RemovePeer{FromStore: 1}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: false}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -1,
RegionSize: -10,
RegionSize: -50,
RegionCount: -1,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 1,
StepCost: 1000,
})

MergeRegion{IsPassive: true}.Influence(opInfluence, region)
c.Assert(*storeOpInfluence[1], DeepEquals, StoreInfluence{
LeaderSize: -10,
LeaderSize: -50,
LeaderCount: -2,
RegionSize: -10,
RegionSize: -50,
RegionCount: -2,
StepCost: 0,
})
c.Assert(*storeOpInfluence[2], DeepEquals, StoreInfluence{
LeaderSize: 10,
LeaderSize: 50,
LeaderCount: 1,
RegionSize: 10,
RegionSize: 50,
RegionCount: 0,
StepCost: 1000,
})
Expand Down
18 changes: 12 additions & 6 deletions server/schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,28 @@ import (
"go.uber.org/zap"
)

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
// RegionSetInformer provides access to a shared informer of regions.
// TODO: move to core package
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *core.RegionInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo
}

// Cluster provides an overview of a cluster's regions distribution.
type Cluster interface {
RegionSetInformer
GetStores() []*core.StoreInfo
GetStore(id uint64) *core.StoreInfo
GetRegion(id uint64) *core.RegionInfo

GetRegionStores(region *core.RegionInfo) []*core.StoreInfo
GetFollowerStores(region *core.RegionInfo) []*core.StoreInfo
GetLeaderStore(region *core.RegionInfo) *core.StoreInfo
GetAdjacentRegions(region *core.RegionInfo) (*core.RegionInfo, *core.RegionInfo)
ScanRegions(startKey []byte, limit int) []*core.RegionInfo

BlockStore(id uint64) error
UnblockStore(id uint64)

Expand Down
Loading

0 comments on commit ee98bf9

Please sign in to comment.