Skip to content

Commit

Permalink
adjust configurations
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Apr 30, 2019
1 parent fa672e9 commit 602dbe0
Show file tree
Hide file tree
Showing 18 changed files with 123 additions and 72 deletions.
2 changes: 2 additions & 0 deletions server/api/trend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package api

import (
"fmt"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -48,6 +49,7 @@ func (s *testTrendSuite) TestTrend(c *C) {
// Create 3 operators that transfers leader, moves follower, moves leader.
c.Assert(svr.GetHandler().AddTransferLeaderOperator(4, 2), IsNil)
c.Assert(svr.GetHandler().AddTransferPeerOperator(5, 2, 3), IsNil)
time.Sleep(1 * time.Second)
c.Assert(svr.GetHandler().AddTransferPeerOperator(6, 1, 3), IsNil)

// Complete the operators.
Expand Down
3 changes: 3 additions & 0 deletions server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,9 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error {
}

newStore := store.Clone(core.SetStoreState(metapb.StoreState_Offline))
opController := c.coordinator.opController
opController.SetOfflineStoreLimit(newStore.GetID())

log.Warn("store has been offline",
zap.Uint64("store-id", newStore.GetID()),
zap.String("store-address", newStore.GetAddress()))
Expand Down
20 changes: 14 additions & 6 deletions server/cluster_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,18 +233,18 @@ func (c *clusterInfo) UnblockStore(storeID uint64) {
c.core.UnblockStore(storeID)
}

// OverloadStore stops balancer from selecting the store.
func (c *clusterInfo) OverloadStore(storeID uint64) error {
// SetStoreOverload stops balancer from selecting the store.
func (c *clusterInfo) SetStoreOverload(storeID uint64) error {
c.Lock()
defer c.Unlock()
return c.core.OverloadStore(storeID)
return c.core.SetStoreOverload(storeID)
}

// UnburdenStore allows balancer to select the store.
func (c *clusterInfo) UnburdenStore(storeID uint64) {
// ResetStoreOverload allows balancer to select the store.
func (c *clusterInfo) ResetStoreOverload(storeID uint64) {
c.Lock()
defer c.Unlock()
c.core.UnburdenStore(storeID)
c.core.ResetStoreOverload(storeID)
}

// GetStores returns all stores in the cluster.
Expand Down Expand Up @@ -713,6 +713,14 @@ func (c *clusterInfo) GetStoreBucketRate() float64 {
return c.opt.GetStoreBucketRate()
}

func (c *clusterInfo) GetOfflineStoreMaxScheduleCost() int64 {
return c.opt.GetOfflineStoreMaxScheduleCost()
}

func (c *clusterInfo) GetOfflineStoreBucketRate() float64 {
return c.opt.GetOfflineStoreBucketRate()
}

func (c *clusterInfo) GetTolerantSizeRatio() float64 {
return c.opt.GetTolerantSizeRatio()
}
Expand Down
48 changes: 29 additions & 19 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,10 @@ type ScheduleConfig struct {
StoreMaxScheduleCost int64 `toml:"store-max-schedule-cost,omitempty" json:"store-max-schedule-cost"`
// StoreBucketRate is the maxinum of bucket rate for each store.
StoreBucketRate float64 `toml:"store-bucket-rate,omitempty" json:"store-bucket-rate"`
// OfflineStoreMaxScheduleCost is the maxinum of scheduling cost for a offline store.
OfflineStoreMaxScheduleCost int64 `toml:"offline-store-max-schedule-cost,omitempty" json:"offline-store-max-schedule-cost"`
// OfflineStoreBucketRate is the maxinum of bucket rate for a offline store.
OfflineStoreBucketRate float64 `toml:"offline-store-bucket-rate,omitempty" json:"offline-store-bucket-rate"`
// TolerantSizeRatio is the ratio of buffer size for balance scheduler.
TolerantSizeRatio float64 `toml:"tolerant-size-ratio,omitempty" json:"tolerant-size-ratio"`
//
Expand Down Expand Up @@ -540,6 +544,8 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
MaxScheduleCost: c.MaxScheduleCost,
StoreMaxScheduleCost: c.StoreMaxScheduleCost,
StoreBucketRate: c.StoreBucketRate,
OfflineStoreMaxScheduleCost: c.OfflineStoreMaxScheduleCost,
OfflineStoreBucketRate: c.OfflineStoreBucketRate,
TolerantSizeRatio: c.TolerantSizeRatio,
LowSpaceRatio: c.LowSpaceRatio,
HighSpaceRatio: c.HighSpaceRatio,
Expand All @@ -555,25 +561,27 @@ func (c *ScheduleConfig) clone() *ScheduleConfig {
}

const (
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 1000
defaultRegionScheduleLimit = 1000
defaultReplicaScheduleLimit = 1000
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultMaxScheduleCost = 0
defaultStoreMaxScheduleCost = 50
defaultStoreBucketRate = 0.05
defaultTolerantSizeRatio = 5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
defaultMaxReplicas = 3
defaultMaxSnapshotCount = 3
defaultMaxPendingPeerCount = 16
defaultMaxMergeRegionSize = 20
defaultMaxMergeRegionKeys = 200000
defaultSplitMergeInterval = 1 * time.Hour
defaultPatrolRegionInterval = 100 * time.Millisecond
defaultMaxStoreDownTime = 30 * time.Minute
defaultLeaderScheduleLimit = 8
defaultRegionScheduleLimit = 1024
defaultReplicaScheduleLimit = 1024
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultMaxScheduleCost = 0
defaultStoreMaxScheduleCost = 200
defaultStoreBucketRate = 100
defaultOfflineStoreMaxScheduleCost = 600
defaultOfflineStoreBucketRate = 300
defaultTolerantSizeRatio = 25
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
// defaultHotRegionCacheHitsThreshold is the low hit number threshold of the
// hot region.
defaultHotRegionCacheHitsThreshold = 3
Expand Down Expand Up @@ -623,6 +631,8 @@ func (c *ScheduleConfig) adjust(meta *configMetaData) error {
adjustFloat64(&c.TolerantSizeRatio, defaultTolerantSizeRatio)
}
adjustFloat64(&c.StoreBucketRate, defaultStoreBucketRate)
adjustInt64(&c.OfflineStoreMaxScheduleCost, defaultOfflineStoreMaxScheduleCost)
adjustFloat64(&c.OfflineStoreBucketRate, defaultOfflineStoreBucketRate)
adjustFloat64(&c.LowSpaceRatio, defaultLowSpaceRatio)
adjustFloat64(&c.HighSpaceRatio, defaultHighSpaceRatio)
adjustSchedulers(&c.Schedulers, defaultSchedulers)
Expand Down
1 change: 1 addition & 0 deletions server/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func newTestOperator(regionID uint64, regionEpoch *metapb.RegionEpoch, kind sche

func newTestScheduleConfig() (*ScheduleConfig, *scheduleOption, error) {
cfg := NewConfig()
cfg.Schedule.TolerantSizeRatio = 5
if err := cfg.Adjust(nil); err != nil {
return nil, nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions server/core/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ func (s *StoresInfo) UnblockStore(storeID uint64) {
s.stores[storeID] = store.Clone(SetStoreUnBlock())
}

// OverloadStore overloads a StoreInfo with storeID.
func (s *StoresInfo) OverloadStore(storeID uint64) errcode.ErrorCode {
// SetStoreOverload set a StoreInfo with storeID overload.
func (s *StoresInfo) SetStoreOverload(storeID uint64) errcode.ErrorCode {
op := errcode.Op("store.overload")
store, ok := s.stores[storeID]
if !ok {
Expand All @@ -556,14 +556,14 @@ func (s *StoresInfo) OverloadStore(storeID uint64) errcode.ErrorCode {
return nil
}

// UnburdenStore unburden a StoreInfo with storeID.
func (s *StoresInfo) UnburdenStore(storeID uint64) {
// ResetStoreOverload reset a StoreInfo with storeID overload.
func (s *StoresInfo) ResetStoreOverload(storeID uint64) {
store, ok := s.stores[storeID]
if !ok {
log.Fatal("store is unburdened, but it is not found",
log.Fatal("store overload is reset, but it is not found",
zap.Uint64("store-id", storeID))
}
s.stores[storeID] = store.Clone(SetStoreUnburden())
s.stores[storeID] = store.Clone(ResetStoreOverload())
}

// GetStores gets a complete set of StoreInfo.
Expand Down
4 changes: 2 additions & 2 deletions server/core/store_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ func SetStoreOverload() StoreCreateOption {
}
}

// SetStoreUnburden allows balancer to select the store.
func SetStoreUnburden() StoreCreateOption {
// ResetStoreOverload allows balancer to select the store.
func ResetStoreOverload() StoreCreateOption {
return func(store *StoreInfo) {
store.overloaded = false
}
Expand Down
8 changes: 8 additions & 0 deletions server/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ func (o *scheduleOption) GetStoreBucketRate() float64 {
return o.load().StoreBucketRate
}

func (o *scheduleOption) GetOfflineStoreMaxScheduleCost() int64 {
return o.load().OfflineStoreMaxScheduleCost
}

func (o *scheduleOption) GetOfflineStoreBucketRate() float64 {
return o.load().OfflineStoreBucketRate
}

func (o *scheduleOption) GetTolerantSizeRatio() float64 {
return o.load().TolerantSizeRatio
}
Expand Down
12 changes: 6 additions & 6 deletions server/schedule/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func (bc *BasicCluster) UnblockStore(storeID uint64) {
bc.Stores.UnblockStore(storeID)
}

// OverloadStore stops balancer from selecting the store.
func (bc *BasicCluster) OverloadStore(storeID uint64) error {
return bc.Stores.OverloadStore(storeID)
// SetStoreOverload stops balancer from selecting the store.
func (bc *BasicCluster) SetStoreOverload(storeID uint64) error {
return bc.Stores.SetStoreOverload(storeID)
}

// UnburdenStore allows balancer to select the store.
func (bc *BasicCluster) UnburdenStore(storeID uint64) {
bc.Stores.UnburdenStore(storeID)
// ResetStoreOverload allows balancer to select the store.
func (bc *BasicCluster) ResetStoreOverload(storeID uint64) {
bc.Stores.ResetStoreOverload(storeID)
}

// RandFollowerRegion returns a random region that has a follower on the store.
Expand Down
16 changes: 14 additions & 2 deletions server/schedule/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,8 +502,8 @@ const (
defaultMergeScheduleLimit = 8
defaultHotRegionScheduleLimit = 2
defaultMaxScheduleCost = 0
defaultStoreMaxScheduleCost = 50
defaultStoreBucketRate = 0.05
defaultStoreMaxScheduleCost = 200
defaultStoreBucketRate = 100
defaultTolerantSizeRatio = 2.5
defaultLowSpaceRatio = 0.8
defaultHighSpaceRatio = 0.6
Expand All @@ -521,6 +521,8 @@ type MockSchedulerOptions struct {
MaxScheduleCost int64
StoreMaxScheduleCost int64
StoreBucketRate float64
OfflineStoreMaxScheduleCost int64
OfflineStoreBucketRate float64
MaxSnapshotCount uint64
MaxPendingPeerCount uint64
MaxMergeRegionSize uint64
Expand Down Expand Up @@ -608,6 +610,16 @@ func (mso *MockSchedulerOptions) GetStoreBucketRate() float64 {
return mso.StoreBucketRate
}

// GetOfflineStoreMaxScheduleCost mock method
func (mso *MockSchedulerOptions) GetOfflineStoreMaxScheduleCost() int64 {
return mso.OfflineStoreMaxScheduleCost
}

// GetOfflineStoreBucketRate mock method
func (mso *MockSchedulerOptions) GetOfflineStoreBucketRate() float64 {
return mso.OfflineStoreBucketRate
}

// GetMaxSnapshotCount mock method
func (mso *MockSchedulerOptions) GetMaxSnapshotCount() uint64 {
return mso.MaxSnapshotCount
Expand Down
27 changes: 13 additions & 14 deletions server/schedule/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,13 @@ const (
// RegionOperatorWaitTime is the duration that when a region operator lives
// longer than it, the operator will be considered timeout.
RegionOperatorWaitTime = 10 * time.Minute
// RegionWeight reflects the influence which is caused by a region related step in an operator.
RegionWeight = 10
// LeaderWeight reflects the influence which is caused by a leader related step in an operator.
LeaderWeight = 1
)

const (
// SmallInfluence represents the influence of a operator step which may have a small influence on the store.
SmallInfluence = 1
// BigInfluence represents the influence of a operator step which may have a big influence on the store.
BigInfluence = 100
)

// OperatorStep describes the basic scheduling steps that can not be subdivided.
Expand Down Expand Up @@ -69,10 +72,8 @@ func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionI

from.LeaderSize -= region.GetApproximateSize()
from.LeaderCount--
from.StepCost += LeaderWeight
to.LeaderSize += region.GetApproximateSize()
to.LeaderCount++
to.StepCost += LeaderWeight
}

// AddPeer is an OperatorStep that adds a region peer.
Expand Down Expand Up @@ -102,7 +103,7 @@ func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) {

to.RegionSize += region.GetApproximateSize()
to.RegionCount++
to.StepCost += RegionWeight
to.StepCost += BigInfluence
}

// AddLearner is an OperatorStep that adds a region learner peer.
Expand Down Expand Up @@ -132,7 +133,7 @@ func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo)

to.RegionSize += region.GetApproximateSize()
to.RegionCount++
to.StepCost += RegionWeight
to.StepCost += BigInfluence
}

// PromoteLearner is an OperatorStep that promotes a region learner peer to normal voter.
Expand All @@ -158,7 +159,7 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool {
// Influence calculates the store difference that current step make
func (pl PromoteLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {
to := opInfluence.GetStoreInfluence(pl.ToStore)
to.StepCost += RegionWeight
to.StepCost += SmallInfluence
}

// RemovePeer is an OperatorStep that removes a region peer.
Expand All @@ -181,7 +182,7 @@ func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo)

from.RegionSize -= region.GetApproximateSize()
from.RegionCount--
from.StepCost += RegionWeight
from.StepCost += SmallInfluence
}

// MergeRegion is an OperatorStep that merge two regions.
Expand Down Expand Up @@ -215,10 +216,9 @@ func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo
for _, p := range region.GetPeers() {
o := opInfluence.GetStoreInfluence(p.GetStoreId())
o.RegionCount--
o.StepCost += RegionWeight
o.StepCost += SmallInfluence
if region.GetLeader().GetId() == p.GetId() {
o.LeaderCount--
o.StepCost += LeaderWeight
}
}
}
Expand All @@ -244,10 +244,9 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo
for _, p := range region.GetPeers() {
inf := opInfluence.GetStoreInfluence(p.GetStoreId())
inf.RegionCount++
inf.StepCost += RegionWeight
inf.StepCost += SmallInfluence
if region.GetLeader().GetId() == p.GetId() {
inf.LeaderCount++
inf.StepCost += LeaderWeight
}
}
}
Expand Down
15 changes: 12 additions & 3 deletions server/schedule/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ func (oc *OperatorController) AddOperator(ops ...*Operator) bool {
}
return false
}

for _, op := range ops {
oc.addOperatorLocked(op)
}
Expand Down Expand Up @@ -191,7 +190,7 @@ func (oc *OperatorController) removeOperatorLocked(op *Operator) {
opInfluence := NewTotalOpInfluence([]*Operator{op}, oc.cluster)
for storeID := range opInfluence.storesInfluence {
if oc.cluster.GetStore(storeID).IsOverloaded() {
oc.cluster.UnburdenStore(storeID)
oc.cluster.ResetStoreOverload(storeID)
}
}
oc.updateCounts(oc.operators)
Expand Down Expand Up @@ -505,8 +504,11 @@ func (oc *OperatorController) exceedStoreLimit(ops ...*Operator) bool {
if oc.storesLimit[storeID] == nil {
oc.storesLimit[storeID] = ratelimit.NewBucketWithRate(oc.cluster.GetStoreBucketRate(), oc.cluster.GetStoreMaxScheduleCost())
}
if opInfluence.GetStoreInfluence(storeID).StepCost == 0 {
continue
}
if oc.storesLimit[storeID].Available() < opInfluence.GetStoreInfluence(storeID).StepCost {
oc.cluster.OverloadStore(storeID)
oc.cluster.SetStoreOverload(storeID)
return true
}
}
Expand All @@ -523,3 +525,10 @@ func (oc *OperatorController) GetScheduleCost() int64 {
}
return scheduleCost
}

// SetOfflineStoreLimit is used to set the limit of a store which is in offline state.
func (oc *OperatorController) SetOfflineStoreLimit(storeID uint64) {
oc.Lock()
defer oc.Unlock()
oc.storesLimit[storeID] = ratelimit.NewBucketWithRate(oc.cluster.GetOfflineStoreBucketRate(), oc.cluster.GetOfflineStoreMaxScheduleCost())
}
Loading

0 comments on commit 602dbe0

Please sign in to comment.