Skip to content

Commit

Permalink
scheduler: use pending amp in hot region scheduler (#3926)
Browse files Browse the repository at this point in the history
* use pending amp factor

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* add query test

Signed-off-by: lhy1024 <admin@liudos.us>

* add more config check

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

* address comment

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
lhy1024 and ti-chi-bot authored Aug 7, 2021
1 parent 8481fab commit 5d35e13
Show file tree
Hide file tree
Showing 8 changed files with 490 additions and 250 deletions.
136 changes: 69 additions & 67 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,13 +333,14 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en
// AddRegionWithReadInfo adds region with specified leader, followers and read info.
func (mc *Cluster) AddRegionWithReadInfo(
regionID uint64, leaderStoreID uint64,
readBytes, readKeys uint64,
readBytes, readKeys, readQuery uint64,
reportInterval uint64,
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
r = r.Clone(core.SetReadQuery(readQuery))
filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
if len(filledNums) > 0 {
filledNum = filledNums[0]
Expand Down Expand Up @@ -381,12 +382,13 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStor
// AddRegionLeaderWithReadInfo add region leader read info
func (mc *Cluster) AddRegionLeaderWithReadInfo(
regionID uint64, leaderStoreID uint64,
readBytes, readKeys uint64,
readBytes, readKeys, readQuery uint64,
reportInterval uint64,
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReadQuery(readQuery))
r = r.Clone(core.SetReportInterval(reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.ReadFlow)
if len(filledNums) > 0 {
Expand All @@ -407,13 +409,14 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo(
// AddLeaderRegionWithWriteInfo adds region with specified leader and peers write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderStoreID uint64,
writtenBytes, writtenKeys uint64,
writtenBytes, writtenKeys, writtenQuery uint64,
reportInterval uint64,
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
r = r.Clone(core.SetWrittenQuery(writtenQuery))

filledNum := mc.HotCache.GetFilledPeriod(statistics.WriteFlow)
if len(filledNums) > 0 {
Expand Down Expand Up @@ -491,11 +494,9 @@ func (mc *Cluster) UpdateRegionCount(storeID uint64, regionCount int) {

// UpdateSnapshotCount updates store snapshot count.
func (mc *Cluster) UpdateSnapshotCount(storeID uint64, snapshotCount int) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.ReceivingSnapCount = uint32(snapshotCount)
newStore := store.Clone(core.SetStoreStats(newStats))
mc.PutStore(newStore)
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.ReceivingSnapCount = uint32(snapshotCount)
})
}

// UpdatePendingPeerCount updates store pending peer count.
Expand All @@ -507,91 +508,92 @@ func (mc *Cluster) UpdatePendingPeerCount(storeID uint64, pendingPeerCount int)

// UpdateStorageRatio updates store storage ratio count.
func (mc *Cluster) UpdateStorageRatio(storeID uint64, usedRatio, availableRatio float64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.Capacity = defaultStoreCapacity
newStats.UsedSize = uint64(float64(newStats.Capacity) * usedRatio)
newStats.Available = uint64(float64(newStats.Capacity) * availableRatio)
newStore := store.Clone(core.SetStoreStats(newStats))
mc.PutStore(newStore)
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.Capacity = defaultStoreCapacity
newStats.UsedSize = uint64(float64(newStats.Capacity) * usedRatio)
newStats.Available = uint64(float64(newStats.Capacity) * availableRatio)
})
}

// UpdateStorageWrittenStats updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenStats(storeID, bytesWritten, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = keysWritten
})
}

// UpdateStorageReadStats updates store written bytes.
func (mc *Cluster) UpdateStorageReadStats(storeID, bytesRead, keysRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesRead
newStats.KeysRead = keysRead
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.BytesRead = bytesRead
newStats.KeysRead = keysRead
})
}

// UpdateStorageWrittenBytes updates store written bytes.
func (mc *Cluster) UpdateStorageWrittenBytes(storeID uint64, bytesWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = bytesWritten / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.BytesWritten = bytesWritten
newStats.KeysWritten = bytesWritten / 100
newStats.QueryStats = &pdpb.QueryStats{
Put: bytesWritten / 100,
}
})
}

// UpdateStorageReadBytes updates store read bytes.
func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.BytesRead = bytesRead
newStats.KeysRead = bytesRead / 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.BytesRead = bytesRead
newStats.KeysRead = bytesRead / 100
})
}

// UpdateStorageWrittenKeys updates store written keys.
func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysWritten = keysWritten
newStats.BytesWritten = keysWritten * 100
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.KeysWritten = keysWritten
newStats.BytesWritten = keysWritten * 100
})
}

// UpdateStorageReadKeys updates store read bytes.
func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) {
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.KeysRead = keysRead
newStats.BytesRead = keysRead * 100
})
}

// UpdateStorageReadQuery updates store read query.
func (mc *Cluster) UpdateStorageReadQuery(storeID uint64, queryRead uint64) {
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.QueryStats = &pdpb.QueryStats{
Coprocessor: queryRead / 3,
Scan: queryRead / 3,
Get: queryRead / 3,
}
newStats.BytesRead = queryRead * 100
})
}

// UpdateStorageWriteQuery updates store write query.
func (mc *Cluster) UpdateStorageWriteQuery(storeID uint64, queryWrite uint64) {
mc.updateStorageStatistics(storeID, func(newStats *pdpb.StoreStats) {
newStats.QueryStats = &pdpb.QueryStats{
Put: queryWrite / 3,
Delete: queryWrite / 3,
DeleteRange: queryWrite / 3,
}
newStats.BytesWritten = queryWrite * 100
})
}

func (mc *Cluster) updateStorageStatistics(storeID uint64, update func(*pdpb.StoreStats)) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysRead = keysRead
newStats.BytesRead = keysRead * 100
update(newStats)
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
Expand Down
20 changes: 20 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,26 @@ func SetReadKeys(v uint64) RegionCreateOption {
}
}

// SetReadQuery sets the read query for the region.
func SetReadQuery(v uint64) RegionCreateOption {
q := &pdpb.QueryStats{
Coprocessor: v / 3,
Get: v / 3,
Scan: v / 3,
}
return SetQueryStats(q)
}

// SetWrittenQuery sets the write query for the region.
func SetWrittenQuery(v uint64) RegionCreateOption {
q := &pdpb.QueryStats{
Put: v / 3,
Delete: v / 3,
DeleteRange: v / 3,
}
return SetQueryStats(q)
}

// SetQueryStats sets the query stats for the region.
func SetQueryStats(v *pdpb.QueryStats) RegionCreateOption {
return func(region *RegionInfo) {
Expand Down
Loading

0 comments on commit 5d35e13

Please sign in to comment.