diff --git a/pkg/mock/mockcluster/mockcluster.go b/pkg/mock/mockcluster/mockcluster.go index 8036f9d926d..fedba9debc6 100644 --- a/pkg/mock/mockcluster/mockcluster.go +++ b/pkg/mock/mockcluster/mockcluster.go @@ -14,6 +14,7 @@ package mockcluster import ( + "context" "fmt" "strconv" "time" @@ -53,10 +54,11 @@ type Cluster struct { // NewCluster creates a new Cluster func NewCluster(opts *config.PersistOptions) *Cluster { + ctx := context.Background() clus := &Cluster{ BasicCluster: core.NewBasicCluster(), IDAllocator: mockid.NewIDAllocator(), - HotStat: statistics.NewHotStat(), + HotStat: statistics.NewHotStat(ctx), PersistOptions: opts, suspectRegions: map[uint64]struct{}{}, disabledFeatures: make(map[versioninfo.Feature]struct{}), @@ -339,7 +341,7 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo( var items []*statistics.HotPeerStat for i := 0; i < filledNum; i++ { - items = mc.HotCache.CheckRead(r) + items = mc.HotCache.CheckReadSync(r) for _, item := range items { mc.HotCache.Update(item) } @@ -366,7 +368,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo( var items []*statistics.HotPeerStat for i := 0; i < filledNum; i++ { - items = mc.HotCache.CheckWrite(r) + items = mc.HotCache.CheckWriteSync(r) for _, item := range items { mc.HotCache.Update(item) } diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 2b62f6c8df2..7117fb68588 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -205,7 +205,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s c.storage = storage c.id = id c.labelLevelStats = statistics.NewLabelStatistics() - c.hotStat = statistics.NewHotStat() + c.hotStat = statistics.NewHotStat(c.ctx) c.prepareChecker = newPrepareChecker() c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit) c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute) @@ -546,8 +546,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.RUnlock() return err } - writeItems := c.CheckWriteStatus(region) - readItems := c.CheckReadStatus(region) + c.CheckWriteStatus(region) + c.CheckReadStatus(region) c.RUnlock() // Save to storage if meta is updated. @@ -623,7 +623,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { } } - if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew { + if !saveKV && !saveCache && !isNew { return nil } @@ -683,12 +683,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error { c.regionStats.Observe(region, c.getRegionStoresLocked(region)) } - for _, writeItem := range writeItems { - c.hotStat.Update(writeItem) - } - for _, readItem := range readItems { - c.hotStat.Update(readItem) - } + //for _, writeItem := range writeItems { + // c.hotStat.Update(writeItem) + //} + //for _, readItem := range readItems { + // c.hotStat.Update(readItem) + //} c.Unlock() // If there are concurrent heartbeats from the same region, the last write will win even if @@ -1475,14 +1475,14 @@ func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat { return c.hotStat.RegionStats(statistics.WriteFlow, c.GetOpts().GetHotRegionCacheHitsThreshold()) } -// CheckWriteStatus checks the write status, returns whether need update statistics and item. -func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat { - return c.hotStat.CheckWrite(region) +// CheckWriteStatus checks the write status. +func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) { + c.hotStat.CheckWriteAsync(region) } -// CheckReadStatus checks the read status, returns whether need update statistics and item. -func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat { - return c.hotStat.CheckRead(region) +// CheckReadStatus checks the read status. +func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) { + c.hotStat.CheckReadAsync(region) } // TODO: remove me. diff --git a/server/statistics/hot_cache.go b/server/statistics/hot_cache.go index c175fe34bfa..67c741d0d8a 100644 --- a/server/statistics/hot_cache.go +++ b/server/statistics/hot_cache.go @@ -14,6 +14,7 @@ package statistics import ( + "context" "math/rand" "github.com/tikv/pd/server/core" @@ -23,30 +24,51 @@ import ( // only turned off by the simulator and the test. var Denoising = true +const queueCap = 1000 + // HotCache is a cache hold hot regions. type HotCache struct { - writeFlow *hotPeerCache - readFlow *hotPeerCache + writeFlowQueue chan *core.RegionInfo + readFlowQueue chan *core.RegionInfo + writeFlow *hotPeerCache + readFlow *hotPeerCache } // NewHotCache creates a new hot spot cache. -func NewHotCache() *HotCache { - return &HotCache{ - writeFlow: NewHotStoresStats(WriteFlow), - readFlow: NewHotStoresStats(ReadFlow), +func NewHotCache(ctx context.Context) *HotCache { + w := &HotCache{ + writeFlowQueue: make(chan *core.RegionInfo, queueCap), + readFlowQueue: make(chan *core.RegionInfo, queueCap), + writeFlow: NewHotStoresStats(WriteFlow), + readFlow: NewHotStoresStats(ReadFlow), } + go w.updateReadItems(ctx) + go w.updateWriteItems(ctx) + return w } -// CheckWrite checks the write status, returns update items. -func (w *HotCache) CheckWrite(region *core.RegionInfo) []*HotPeerStat { +// CheckWriteSync checks the write status, returns update items. +// This is used for mockcluster. +func (w *HotCache) CheckWriteSync(region *core.RegionInfo) []*HotPeerStat { return w.writeFlow.CheckRegionFlow(region) } -// CheckRead checks the read status, returns update items. -func (w *HotCache) CheckRead(region *core.RegionInfo) []*HotPeerStat { +// CheckReadSync checks the read status, returns update items. +// This is used for mockcluster. +func (w *HotCache) CheckReadSync(region *core.RegionInfo) []*HotPeerStat { return w.readFlow.CheckRegionFlow(region) } +// CheckWriteAsync puts the region into queue, and check it asynchronously +func (w *HotCache) CheckWriteAsync(region *core.RegionInfo) { + w.writeFlowQueue <- region +} + +// CheckReadAsync puts the region into queue, and check it asynchronously +func (w *HotCache) CheckReadAsync(region *core.RegionInfo) { + w.readFlowQueue <- region +} + // Update updates the cache. func (w *HotCache) Update(item *HotPeerStat) { switch item.Kind { @@ -121,3 +143,37 @@ func (w *HotCache) GetFilledPeriod(kind FlowKind) int { } return 0 } + +func (w *HotCache) updateReadItems(ctx context.Context) { + for { + select { + case <-ctx.Done(): + close(w.readFlowQueue) + return + case region, ok := <-w.readFlowQueue: + if ok && region != nil { + items := w.readFlow.CheckRegionFlow(region) + for _, item := range items { + w.Update(item) + } + } + } + } +} + +func (w *HotCache) updateWriteItems(ctx context.Context) { + for { + select { + case <-ctx.Done(): + close(w.writeFlowQueue) + return + case region, ok := <-w.writeFlowQueue: + if ok && region != nil { + items := w.writeFlow.CheckRegionFlow(region) + for _, item := range items { + w.Update(item) + } + } + } + } +} diff --git a/server/statistics/hotstat.go b/server/statistics/hotstat.go index e122e5bf6a9..50a97285b90 100644 --- a/server/statistics/hotstat.go +++ b/server/statistics/hotstat.go @@ -13,6 +13,8 @@ package statistics +import "context" + // HotStat contains cluster's hotspot statistics. type HotStat struct { *HotCache @@ -20,9 +22,9 @@ type HotStat struct { } // NewHotStat creates the container to hold cluster's hotspot statistics. -func NewHotStat() *HotStat { +func NewHotStat(ctx context.Context) *HotStat { return &HotStat{ - HotCache: NewHotCache(), + HotCache: NewHotCache(ctx), StoresStats: NewStoresStats(), } }