Skip to content

Commit

Permalink
Merge branch 'master' into auto-completion
Browse files Browse the repository at this point in the history
  • Loading branch information
hsqlu authored Mar 30, 2020
2 parents 492b46a + 1c0d87c commit 8a0e6af
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 25 deletions.
28 changes: 27 additions & 1 deletion server/config_manager/config_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,8 +207,9 @@ func (c *ConfigManager) CreateConfig(version *configpb.Version, component, compo
latestVersion := c.GetLatestVersion(component, componentID)
initVersion := &configpb.Version{Local: 0, Global: 0}
if localCfgs, ok := c.LocalCfgs[component]; ok {
if _, ok := localCfgs[componentID]; ok {
if local, ok := localCfgs[componentID]; ok {
// restart a component
local.updateLocalConfig(cfg)
if versionEqual(initVersion, version) {
status = &configpb.Status{Code: configpb.StatusCode_OK}
} else {
Expand Down Expand Up @@ -541,6 +542,31 @@ func (lc *LocalConfig) updateEntry(entry *configpb.ConfigEntry, version *configp
entries[entry.GetName()] = NewEntryValue(entry, version)
}

// updateLocalConfig updates a LocalConfig when there is a new config item.
func (lc *LocalConfig) updateLocalConfig(cfg string) error {
new := make(map[string]interface{})
if err := decodeConfigs(cfg, new); err != nil {
return err
}
old := lc.getConfigs()
updateItem(new, old)
return nil
}

func updateItem(new, old map[string]interface{}) {
for key := range new {
if sub, ok := old[key]; ok {
oldSub, ok := sub.(map[string]interface{})
newSub, ok1 := new[key].(map[string]interface{})
if ok && ok1 {
updateItem(newSub, oldSub)
}
} else {
old[key] = new[key]
}
}
}

// GetVersion return the local config version for a component.
func (lc *LocalConfig) GetVersion() *configpb.Version {
if lc == nil {
Expand Down
65 changes: 65 additions & 0 deletions server/config_manager/config_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,68 @@ log-level = "debug"
c.Assert(status.GetCode(), Equals, configpb.StatusCode_COMPONENT_ID_NOT_FOUND)
c.Assert(config, Equals, "")
}

func (s *testComponentsConfigSuite) TestCreateNewItem(c *C) {
cfgData := `
log-level = "debug"
`
cfg := NewConfigManager(nil)

v, config, status := cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
expect := `log-level = "debug"
`
c.Assert(config, Equals, expect)
cfgData1 := `
log-level = "info"
[rocksdb]
wal-recovery-mode = 1
[rocksdb.defaultcf]
block-size = "12KB"
disable-block-cache = false
compression-per-level = [
"no",
"lz4",
]
[rocksdb.defaultcf.titan]
discardable-ratio = 0.00156
`
v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData1)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
expect1 := `log-level = "debug"
[rocksdb]
wal-recovery-mode = 1
[rocksdb.defaultcf]
block-size = "12KB"
compression-per-level = ["no", "lz4"]
disable-block-cache = false
[rocksdb.defaultcf.titan]
discardable-ratio = 0.00156
`
c.Assert(config, Equals, expect1)
cfgData2 := `
log-level = "info"
[rocksdb]
wal-recovery-mode = 2
[rocksdb.defaultcf]
block-size = "10KB"
disable-block-cache = true
compression-per-level = [
"lz4",
"no",
]
[rocksdb.defaultcf.titan]
discardable-ratio = 0.00211
`
v, config, status = cfg.CreateConfig(&configpb.Version{Global: 0, Local: 0}, "tikv", "tikv1", cfgData2)
c.Assert(v, DeepEquals, &configpb.Version{Global: 0, Local: 0})
c.Assert(status.GetCode(), Equals, configpb.StatusCode_OK)
c.Assert(config, Equals, expect1)
}
39 changes: 15 additions & 24 deletions server/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,13 @@ const (
// HotWriteRegionType is hot write region scheduler type.
HotWriteRegionType = "hot-write-region"

hotRegionLimitFactor = 0.75
minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
)

// schedulePeerPr the probability of schedule the hot peer.
var schedulePeerPr = 0.66

type hotScheduler struct {
name string
*BaseScheduler
Expand Down Expand Up @@ -153,12 +155,12 @@ func (h *hotScheduler) IsScheduleAllowed(cluster opt.Cluster) bool {
}

func (h *hotScheduler) allowBalanceLeader(cluster opt.Cluster) bool {
return h.OpController.OperatorCount(operator.OpHotRegion) < minUint64(h.leaderLimit, cluster.GetHotRegionScheduleLimit()) &&
return h.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetHotRegionScheduleLimit() &&
h.OpController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit()
}

func (h *hotScheduler) allowBalanceRegion(cluster opt.Cluster) bool {
return h.OpController.OperatorCount(operator.OpHotRegion) < minUint64(h.peerLimit, cluster.GetHotRegionScheduleLimit())
return h.OpController.OperatorCount(operator.OpHotRegion) < cluster.GetHotRegionScheduleLimit()
}

func (h *hotScheduler) Schedule(cluster opt.Cluster) []*operator.Operator {
Expand Down Expand Up @@ -400,14 +402,19 @@ func (h *hotScheduler) balanceHotReadRegions(cluster opt.Cluster) []*operator.Op

func (h *hotScheduler) balanceHotWriteRegions(cluster opt.Cluster) []*operator.Operator {
// prefer to balance by peer
peerSolver := newBalanceSolver(h, cluster, write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 {
return ops
s := h.r.Intn(100)
switch {
case s < int(schedulePeerPr*100):
peerSolver := newBalanceSolver(h, cluster, write, movePeer)
ops := peerSolver.solve()
if len(ops) > 0 {
return ops
}
default:
}

leaderSolver := newBalanceSolver(h, cluster, write, transferLeader)
ops = leaderSolver.solve()
ops := leaderSolver.solve()
if len(ops) > 0 {
return ops
}
Expand Down Expand Up @@ -966,7 +973,6 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
case movePeer:
srcPeer := bs.cur.region.GetStorePeer(bs.cur.srcStoreID) // checked in getRegionAndSrcPeer
dstPeer := &metapb.Peer{StoreId: bs.cur.dstStoreID, IsLearner: srcPeer.IsLearner}
bs.sche.peerLimit = bs.sche.adjustBalanceLimit(bs.cur.srcStoreID, bs.stLoadDetail)
op, err = operator.CreateMovePeerOperator(
"move-hot-"+bs.rwTy.String()+"-region",
bs.cluster,
Expand All @@ -981,7 +987,6 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
if bs.cur.region.GetStoreVoter(bs.cur.dstStoreID) == nil {
return nil, nil
}
bs.sche.leaderLimit = bs.sche.adjustBalanceLimit(bs.cur.srcStoreID, bs.stLoadDetail)
op, err = operator.CreateTransferLeaderOperator(
"transfer-hot-"+bs.rwTy.String()+"-leader",
bs.cluster,
Expand Down Expand Up @@ -1013,20 +1018,6 @@ func (bs *balanceSolver) buildOperators() ([]*operator.Operator, []Influence) {
return []*operator.Operator{op}, []Influence{infl}
}

func (h *hotScheduler) adjustBalanceLimit(storeID uint64, loadDetail map[uint64]*storeLoadDetail) uint64 {
srcStoreStatistics := loadDetail[storeID]

var hotRegionTotalCount int
for _, m := range loadDetail {
hotRegionTotalCount += len(m.HotPeers)
}

avgRegionCount := float64(hotRegionTotalCount) / float64(len(loadDetail))
// Multiplied by hotRegionLimitFactor to avoid transfer back and forth
limit := uint64((float64(len(srcStoreStatistics.HotPeers)) - avgRegionCount) * hotRegionLimitFactor)
return maxUint64(limit, 1)
}

func (h *hotScheduler) GetHotReadStatus() *statistics.StoreHotPeersInfos {
h.RLock()
defer h.RUnlock()
Expand Down
4 changes: 4 additions & 0 deletions server/schedulers/hot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"github.com/pingcap/pd/v4/server/statistics"
)

func init() {
schedulePeerPr = 1.0
}

var _ = Suite(&testHotWriteRegionSchedulerSuite{})
var _ = Suite(&testHotSchedulerSuite{})

Expand Down

0 comments on commit 8a0e6af

Please sign in to comment.