Skip to content

Commit

Permalink
scheduler: hotspot: solution-based key-rate-aware balance solver (#2141)
Browse files Browse the repository at this point in the history
  • Loading branch information
Luffbee authored Feb 26, 2020
1 parent 904e833 commit c8589fc
Show file tree
Hide file tree
Showing 7 changed files with 1,120 additions and 501 deletions.
40 changes: 38 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,14 @@ func (mc *Cluster) AddLeaderRegionWithRange(regionID uint64, startKey string, en
}

// AddLeaderRegionWithReadInfo adds region with specified leader, followers and read info.
func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64, readBytes uint64, reportInterval uint64, followerIds ...uint64) {
func (mc *Cluster) AddLeaderRegionWithReadInfo(
regionID uint64, leaderID uint64,
readBytes, readKeys uint64,
reportInterval uint64,
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetReadBytes(readBytes))
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotCache.CheckRead(r, mc.StoresStats)
for _, item := range items {
Expand All @@ -282,9 +287,14 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(regionID uint64, leaderID uint64,
}

// AddLeaderRegionWithWriteInfo adds region with specified leader, followers and write info.
func (mc *Cluster) AddLeaderRegionWithWriteInfo(regionID uint64, leaderID uint64, writtenBytes uint64, reportInterval uint64, followerIds ...uint64) {
func (mc *Cluster) AddLeaderRegionWithWriteInfo(
regionID uint64, leaderID uint64,
writtenBytes, writtenKeys uint64,
reportInterval uint64,
followerIds []uint64) {
r := mc.newMockRegionInfo(regionID, leaderID, followerIds...)
r = r.Clone(core.SetWrittenBytes(writtenBytes))
r = r.Clone(core.SetWrittenKeys(writtenKeys))
r = r.Clone(core.SetReportInterval(reportInterval))
items := mc.HotCache.CheckWrite(r, mc.StoresStats)
for _, item := range items {
Expand Down Expand Up @@ -404,6 +414,32 @@ func (mc *Cluster) UpdateStorageReadBytes(storeID uint64, bytesRead uint64) {
mc.PutStore(newStore)
}

// UpdateStorageWrittenKeys updates store written keys.
func (mc *Cluster) UpdateStorageWrittenKeys(storeID uint64, keysWritten uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysWritten = keysWritten
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStorageReadKeys updates store read bytes.
func (mc *Cluster) UpdateStorageReadKeys(storeID uint64, keysRead uint64) {
store := mc.GetStore(storeID)
newStats := proto.Clone(store.GetStoreStats()).(*pdpb.StoreStats)
newStats.KeysRead = keysRead
now := time.Now().Second()
interval := &pdpb.TimeInterval{StartTimestamp: uint64(now - statistics.StoreHeartBeatReportInterval), EndTimestamp: uint64(now)}
newStats.Interval = interval
newStore := store.Clone(core.SetStoreStats(newStats))
mc.Set(storeID, newStats)
mc.PutStore(newStore)
}

// UpdateStoreStatus updates store status.
func (mc *Cluster) UpdateStoreStatus(id uint64) {
leaderCount := mc.Regions.GetStoreLeaderCount(id)
Expand Down
4 changes: 4 additions & 0 deletions server/cluster/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ func (c *coordinator) collectHotSpotMetrics() {
infl := pendings[storeID]
// TODO: add to tidb-ansible after merging pending influence into operator influence.
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "write_pending_influence_count").Set(infl.Count)
}

// Collects hot read region metrics.
Expand All @@ -440,6 +442,8 @@ func (c *coordinator) collectHotSpotMetrics() {

infl := pendings[storeID]
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_byte_rate").Set(infl.ByteRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_key_rate").Set(infl.KeyRate)
hotSpotStatusGauge.WithLabelValues(storeAddress, storeLabel, "read_pending_influence_count").Set(infl.Count)
}
}

Expand Down
Loading

0 comments on commit c8589fc

Please sign in to comment.