Skip to content

Commit

Permalink
asynchronous check
Browse files Browse the repository at this point in the history
Signed-off-by: yisaer <disxiaofei@163.com>
  • Loading branch information
Yisaer committed May 8, 2021
1 parent a002e18 commit 04de5a9
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 31 deletions.
8 changes: 5 additions & 3 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package mockcluster

import (
"context"
"fmt"
"strconv"
"time"
Expand Down Expand Up @@ -53,10 +54,11 @@ type Cluster struct {

// NewCluster creates a new Cluster
func NewCluster(opts *config.PersistOptions) *Cluster {
ctx := context.Background()
clus := &Cluster{
BasicCluster: core.NewBasicCluster(),
IDAllocator: mockid.NewIDAllocator(),
HotStat: statistics.NewHotStat(),
HotStat: statistics.NewHotStat(ctx),
PersistOptions: opts,
suspectRegions: map[uint64]struct{}{},
disabledFeatures: make(map[versioninfo.Feature]struct{}),
Expand Down Expand Up @@ -339,7 +341,7 @@ func (mc *Cluster) AddLeaderRegionWithReadInfo(

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckRead(r)
items = mc.HotCache.CheckReadSync(r)
for _, item := range items {
mc.HotCache.Update(item)
}
Expand All @@ -366,7 +368,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(

var items []*statistics.HotPeerStat
for i := 0; i < filledNum; i++ {
items = mc.HotCache.CheckWrite(r)
items = mc.HotCache.CheckWriteSync(r)
for _, item := range items {
mc.HotCache.Update(item)
}
Expand Down
32 changes: 16 additions & 16 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (c *RaftCluster) InitCluster(id id.Allocator, opt *config.PersistOptions, s
c.storage = storage
c.id = id
c.labelLevelStats = statistics.NewLabelStatistics()
c.hotStat = statistics.NewHotStat()
c.hotStat = statistics.NewHotStat(c.ctx)
c.prepareChecker = newPrepareChecker()
c.changedRegions = make(chan *core.RegionInfo, defaultChangedRegionsLimit)
c.suspectRegions = cache.NewIDTTL(c.ctx, time.Minute, 3*time.Minute)
Expand Down Expand Up @@ -546,8 +546,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.RUnlock()
return err
}
writeItems := c.CheckWriteStatus(region)
readItems := c.CheckReadStatus(region)
c.CheckWriteStatus(region)
c.CheckReadStatus(region)
c.RUnlock()

// Save to storage if meta is updated.
Expand Down Expand Up @@ -623,7 +623,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
}
}

if len(writeItems) == 0 && len(readItems) == 0 && !saveKV && !saveCache && !isNew {
if !saveKV && !saveCache && !isNew {
return nil
}

Expand Down Expand Up @@ -683,12 +683,12 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
c.regionStats.Observe(region, c.getRegionStoresLocked(region))
}

for _, writeItem := range writeItems {
c.hotStat.Update(writeItem)
}
for _, readItem := range readItems {
c.hotStat.Update(readItem)
}
//for _, writeItem := range writeItems {
// c.hotStat.Update(writeItem)
//}
//for _, readItem := range readItems {
// c.hotStat.Update(readItem)
//}
c.Unlock()

// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down Expand Up @@ -1475,14 +1475,14 @@ func (c *RaftCluster) RegionWriteStats() map[uint64][]*statistics.HotPeerStat {
return c.hotStat.RegionStats(statistics.WriteFlow, c.GetOpts().GetHotRegionCacheHitsThreshold())
}

// CheckWriteStatus checks the write status, returns whether need update statistics and item.
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotStat.CheckWrite(region)
// CheckWriteStatus checks the write status.
func (c *RaftCluster) CheckWriteStatus(region *core.RegionInfo) {
c.hotStat.CheckWriteAsync(region)
}

// CheckReadStatus checks the read status, returns whether need update statistics and item.
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) []*statistics.HotPeerStat {
return c.hotStat.CheckRead(region)
// CheckReadStatus checks the read status.
func (c *RaftCluster) CheckReadStatus(region *core.RegionInfo) {
c.hotStat.CheckReadAsync(region)
}

// TODO: remove me.
Expand Down
76 changes: 66 additions & 10 deletions server/statistics/hot_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package statistics

import (
"context"
"math/rand"

"github.com/tikv/pd/server/core"
Expand All @@ -23,30 +24,51 @@ import (
// only turned off by the simulator and the test.
var Denoising = true

const queueCap = 1000

// HotCache is a cache hold hot regions.
type HotCache struct {
writeFlow *hotPeerCache
readFlow *hotPeerCache
writeFlowQueue chan *core.RegionInfo
readFlowQueue chan *core.RegionInfo
writeFlow *hotPeerCache
readFlow *hotPeerCache
}

// NewHotCache creates a new hot spot cache.
func NewHotCache() *HotCache {
return &HotCache{
writeFlow: NewHotStoresStats(WriteFlow),
readFlow: NewHotStoresStats(ReadFlow),
func NewHotCache(ctx context.Context) *HotCache {
w := &HotCache{
writeFlowQueue: make(chan *core.RegionInfo, queueCap),
readFlowQueue: make(chan *core.RegionInfo, queueCap),
writeFlow: NewHotStoresStats(WriteFlow),
readFlow: NewHotStoresStats(ReadFlow),
}
go w.updateReadItems(ctx)
go w.updateWriteItems(ctx)
return w
}

// CheckWrite checks the write status, returns update items.
func (w *HotCache) CheckWrite(region *core.RegionInfo) []*HotPeerStat {
// CheckWriteSync checks the write status, returns update items.
// This is used for mockcluster.
func (w *HotCache) CheckWriteSync(region *core.RegionInfo) []*HotPeerStat {
return w.writeFlow.CheckRegionFlow(region)
}

// CheckRead checks the read status, returns update items.
func (w *HotCache) CheckRead(region *core.RegionInfo) []*HotPeerStat {
// CheckReadSync checks the read status, returns update items.
// This is used for mockcluster.
func (w *HotCache) CheckReadSync(region *core.RegionInfo) []*HotPeerStat {
return w.readFlow.CheckRegionFlow(region)
}

// CheckWriteAsync puts the region into queue, and check it asynchronously
func (w *HotCache) CheckWriteAsync(region *core.RegionInfo) {
w.writeFlowQueue <- region
}

// CheckReadAsync puts the region into queue, and check it asynchronously
func (w *HotCache) CheckReadAsync(region *core.RegionInfo) {
w.readFlowQueue <- region
}

// Update updates the cache.
func (w *HotCache) Update(item *HotPeerStat) {
switch item.Kind {
Expand Down Expand Up @@ -121,3 +143,37 @@ func (w *HotCache) GetFilledPeriod(kind FlowKind) int {
}
return 0
}

func (w *HotCache) updateReadItems(ctx context.Context) {
for {
select {
case <-ctx.Done():
close(w.readFlowQueue)
return
case region, ok := <-w.readFlowQueue:
if ok && region != nil {
items := w.readFlow.CheckRegionFlow(region)
for _, item := range items {
w.Update(item)
}
}
}
}
}

func (w *HotCache) updateWriteItems(ctx context.Context) {
for {
select {
case <-ctx.Done():
close(w.writeFlowQueue)
return
case region, ok := <-w.writeFlowQueue:
if ok && region != nil {
items := w.writeFlow.CheckRegionFlow(region)
for _, item := range items {
w.Update(item)
}
}
}
}
}
6 changes: 4 additions & 2 deletions server/statistics/hotstat.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@

package statistics

import "context"

// HotStat contains cluster's hotspot statistics.
type HotStat struct {
*HotCache
*StoresStats
}

// NewHotStat creates the container to hold cluster's hotspot statistics.
func NewHotStat() *HotStat {
func NewHotStat(ctx context.Context) *HotStat {
return &HotStat{
HotCache: NewHotCache(),
HotCache: NewHotCache(ctx),
StoresStats: NewStoresStats(),
}
}

0 comments on commit 04de5a9

Please sign in to comment.