Skip to content

Commit

Permalink
Merge branch 'master' into issue-3886
Browse files Browse the repository at this point in the history
  • Loading branch information
disksing authored Jul 22, 2021
2 parents afcd08c + 84030ff commit 65bcbe6
Show file tree
Hide file tree
Showing 13 changed files with 90 additions and 19 deletions.
4 changes: 0 additions & 4 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ var (
ErrIncorrectSystemTime = errors.Normalize("incorrect system time", errors.RFCCodeText("PD:common:ErrIncorrectSystemTime"))
)

// The internal error which is generated in PD project.
// main errors
var ()

// tso errors
var (
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
Expand Down
13 changes: 10 additions & 3 deletions pkg/movingaverage/exponential_moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ type EMA struct {
// unreasonably large effect on early forecasts. This problem can be overcome by allowing the process to evolve for
// a reasonable number of periods (say 10 or more) and using the arithmetic average of the demand during
// those periods as the initial forecast.
wakeNum uint64
count uint64
value float64
wakeNum uint64
count uint64
value float64
instantaneous float64
}

// NewEMA returns an EMA.
Expand All @@ -44,6 +45,7 @@ func NewEMA(decays ...float64) *EMA {

// Add adds a data point.
func (e *EMA) Add(num float64) {
e.instantaneous = num
if e.count < e.wakeNum {
e.count++
e.value += num
Expand Down Expand Up @@ -78,3 +80,8 @@ func (e *EMA) Set(n float64) {
e.value = n
e.count = 1
}

// GetInstantaneous returns the value just added.
func (e *EMA) GetInstantaneous() float64 {
return e.instantaneous
}
5 changes: 5 additions & 0 deletions pkg/movingaverage/hull_moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,8 @@ func (h *HMA) Set(n float64) {
h.Reset()
h.Add(n)
}

// GetInstantaneous returns the value just added.
func (h *HMA) GetInstantaneous() float64 {
return h.wma[1].GetInstantaneous()
}
8 changes: 8 additions & 0 deletions pkg/movingaverage/max_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ func (r *MaxFilter) Set(n float64) {
r.records[0] = n
r.count = 1
}

// GetInstantaneous returns the value just added.
func (r *MaxFilter) GetInstantaneous() float64 {
if r.count == 0 {
return 0
}
return r.records[(r.count-1)%r.size]
}
15 changes: 12 additions & 3 deletions pkg/movingaverage/median_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ import "github.com/montanaflynn/stats"
// There are at most `size` data points for calculating.
// References: https://en.wikipedia.org/wiki/Median_filter.
type MedianFilter struct {
records []float64
size uint64
count uint64
records []float64
size uint64
count uint64
instantaneous float64
}

// NewMedianFilter returns a MedianFilter.
Expand All @@ -34,6 +35,7 @@ func NewMedianFilter(size int) *MedianFilter {

// Add adds a data point.
func (r *MedianFilter) Add(n float64) {
r.instantaneous = n
r.records[r.count%r.size] = n
r.count++
}
Expand All @@ -53,11 +55,18 @@ func (r *MedianFilter) Get() float64 {

// Reset cleans the data set.
func (r *MedianFilter) Reset() {
r.instantaneous = 0
r.count = 0
}

// Set = Reset + Add.
func (r *MedianFilter) Set(n float64) {
r.instantaneous = n
r.records[0] = n
r.count = 1
}

// GetInstantaneous returns the value just added.
func (r *MedianFilter) GetInstantaneous() float64 {
return r.instantaneous
}
2 changes: 2 additions & 0 deletions pkg/movingaverage/moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type MovingAvg interface {
Add(data float64)
// Get returns the moving average.
Get() float64
// GetInstantaneous returns the value just added.
GetInstantaneous() float64
// Reset cleans the data set.
Reset()
// Set = Reset + Add
Expand Down
11 changes: 11 additions & 0 deletions pkg/movingaverage/moving_average_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ func checkSet(c *C, ma MovingAvg, data []float64, expected []float64) {
checkAdd(c, ma, data[1:], expected[1:])
}

// checkInstantaneous checks GetInstantaneous
func checkInstantaneous(c *C, ma MovingAvg) {
value := 100.000000
ma.Add(value)
c.Assert(ma.GetInstantaneous(), Equals, value)
}

func (t *testMovingAvg) TestMedianFilter(c *C) {
var empty float64 = 0
data := []float64{2, 4, 2, 800, 600, 6, 3}
Expand Down Expand Up @@ -103,12 +110,16 @@ func (t *testMovingAvg) TestMovingAvg(c *C) {
}, {
ma: NewMedianFilter(5),
expected: []float64{1.000000, 1.000000, 1.000000, 1.000000, 1.000000, 1.000000, 1.000000, 1.000000},
}, {
ma: NewMaxFilter(5),
expected: []float64{1.000000, 1.000000, 1.000000, 1.000000, 5.000000, 5.000000, 5.000000, 5.000000},
},
}
for _, test := range testCases {
c.Assert(test.ma.Get(), Equals, empty)
checkReset(c, test.ma, empty)
checkAdd(c, test.ma, data, test.expected)
checkSet(c, test.ma, data, test.expected)
checkInstantaneous(c, test.ma)
}
}
8 changes: 8 additions & 0 deletions pkg/movingaverage/weight_moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,11 @@ func (w *WMA) Set(n float64) {
w.Reset()
w.Add(n)
}

// GetInstantaneous returns the value just added.
func (w *WMA) GetInstantaneous() float64 {
if w.count == 0 {
return 0
}
return w.records[(w.count-1)%w.size]
}
7 changes: 4 additions & 3 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,17 +313,18 @@ func (s *testRegionSuite) TestAccelerateRegionsScheduleInRange(c *C) {

func (s *testRegionSuite) TestScatterRegions(c *C) {
r1 := newTestRegionInfo(601, 13, []byte("b1"), []byte("b2"))
r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 13}, &metapb.Peer{Id: 6, StoreId: 13})
r1.GetMeta().Peers = append(r1.GetMeta().Peers, &metapb.Peer{Id: 5, StoreId: 14}, &metapb.Peer{Id: 6, StoreId: 15})
r2 := newTestRegionInfo(602, 13, []byte("b2"), []byte("b3"))
r2.GetMeta().Peers = append(r2.GetMeta().Peers, &metapb.Peer{Id: 7, StoreId: 13}, &metapb.Peer{Id: 8, StoreId: 13})
r2.GetMeta().Peers = append(r2.GetMeta().Peers, &metapb.Peer{Id: 7, StoreId: 14}, &metapb.Peer{Id: 8, StoreId: 15})
r3 := newTestRegionInfo(603, 13, []byte("b4"), []byte("b4"))
r3.GetMeta().Peers = append(r3.GetMeta().Peers, &metapb.Peer{Id: 9, StoreId: 13}, &metapb.Peer{Id: 10, StoreId: 13})
r3.GetMeta().Peers = append(r3.GetMeta().Peers, &metapb.Peer{Id: 9, StoreId: 14}, &metapb.Peer{Id: 10, StoreId: 15})
mustRegionHeartbeat(c, s.svr, r1)
mustRegionHeartbeat(c, s.svr, r2)
mustRegionHeartbeat(c, s.svr, r3)
mustPutStore(c, s.svr, 13, metapb.StoreState_Up, []*metapb.StoreLabel{})
mustPutStore(c, s.svr, 14, metapb.StoreState_Up, []*metapb.StoreLabel{})
mustPutStore(c, s.svr, 15, metapb.StoreState_Up, []*metapb.StoreLabel{})
mustPutStore(c, s.svr, 16, metapb.StoreState_Up, []*metapb.StoreLabel{})
body := fmt.Sprintf(`{"start_key":"%s", "end_key": "%s"}`, hex.EncodeToString([]byte("b1")), hex.EncodeToString([]byte("b3")))

err := postJSON(testDialClient, fmt.Sprintf("%s/regions/scatter", s.urlPrefix), []byte(body))
Expand Down
4 changes: 4 additions & 0 deletions server/schedule/operator_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,10 @@ func (t *testOperatorControllerSuite) TestStoreLimitWithMerge(c *C) {
}),
core.WithLeader(&metapb.Peer{Id: 109, StoreId: 2}),
)

// set to a small rate to reduce unstable possibility.
tc.SetAllStoresLimit(storelimit.AddPeer, 0.0000001)
tc.SetAllStoresLimit(storelimit.RemovePeer, 0.0000001)
tc.PutRegion(regions[2])
// The size of Region is less or equal than 1MB.
for i := 0; i < 50; i++ {
Expand Down
27 changes: 23 additions & 4 deletions server/schedule/region_scatterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,14 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
}

scatterWithSameEngine(ordinaryPeers, r.ordinaryEngine)
// FIXME: target leader only considers the ordinary storesmaybe we need to consider the
// FIXME: target leader only considers the ordinary stores, maybe we need to consider the
// special engine stores if the engine supports to become a leader. But now there is only
// one engine, tiflash, which does not support the leader, so don't consider it for now.
targetLeader := r.selectAvailableLeaderStores(group, targetPeers, r.ordinaryEngine)
targetLeader := r.selectAvailableLeaderStore(group, targetPeers, r.ordinaryEngine)
if targetLeader == 0 {
scatterCounter.WithLabelValues("no-leader", "").Inc()
return nil
}

for engine, peers := range specialPeers {
ctx, ok := r.specialEngines[engine]
Expand All @@ -315,6 +319,11 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
scatterWithSameEngine(peers, ctx)
}

if isSameDistribution(region, targetPeers, targetLeader) {
scatterCounter.WithLabelValues("unnecessary", "").Inc()
r.Put(targetPeers, targetLeader, group)
return nil
}
op, err := operator.CreateScatterRegionOperator("scatter-region", r.cluster, region, targetPeers, targetLeader)
if err != nil {
scatterCounter.WithLabelValues("fail", "").Inc()
Expand All @@ -333,6 +342,16 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) *
return op
}

func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) bool {
peers := region.GetPeers()
for _, peer := range peers {
if _, ok := targetPeers[peer.GetStoreId()]; !ok {
return false
}
}
return region.GetLeader().GetStoreId() == targetLeader
}

func (r *RegionScatterer) selectCandidates(region *core.RegionInfo, sourceStoreID uint64, selectedStores map[uint64]struct{}, context engineContext) []uint64 {
sourceStore := r.cluster.GetStore(sourceStoreID)
if sourceStore == nil {
Expand Down Expand Up @@ -400,9 +419,9 @@ func (r *RegionScatterer) selectStore(group string, peer *metapb.Peer, sourceSto
return newPeer
}

// selectAvailableLeaderStores select the target leader store from the candidates. The candidates would be collected by
// selectAvailableLeaderStore select the target leader store from the candidates. The candidates would be collected by
// the existed peers store depended on the leader counts in the group level.
func (r *RegionScatterer) selectAvailableLeaderStores(group string, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
func (r *RegionScatterer) selectAvailableLeaderStore(group string, peers map[uint64]*metapb.Peer, context engineContext) uint64 {
leaderCandidateStores := make([]uint64, 0)
for storeID := range peers {
store := r.cluster.GetStore(storeID)
Expand Down
2 changes: 2 additions & 0 deletions server/schedule/region_scatterer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,8 @@ func (s *testScatterRegionSuite) TestScatterGroupInConcurrency(c *C) {
// Add 5 stores.
for i := uint64(1); i <= 5; i++ {
tc.AddRegionStore(i, 0)
// prevent store from being disconnected
tc.SetStoreLastHeartbeatInterval(i, -10*time.Minute)
}

testcases := []struct {
Expand Down
3 changes: 1 addition & 2 deletions server/statistics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,15 +208,14 @@ func (r *RollingStoreStats) GetLoad(k StoreStatKind) float64 {
}

// GetInstantLoad returns store's instant load.
// MovingAvgs do not support GetInstantaneous() so they return average values.
func (r *RollingStoreStats) GetInstantLoad(k StoreStatKind) float64 {
r.RLock()
defer r.RUnlock()
switch k {
case StoreReadBytes, StoreReadKeys, StoreReadQuery, StoreWriteBytes, StoreWriteKeys, StoreWriteQuery:
return r.timeMedians[k].GetInstantaneous()
case StoreCPUUsage, StoreDiskReadRate, StoreDiskWriteRate:
return r.movingAvgs[k].Get()
return r.movingAvgs[k].GetInstantaneous()
}
return 0
}

0 comments on commit 65bcbe6

Please sign in to comment.