diff --git a/server/core/region_option.go b/server/core/region_option.go index 5772ca1dc93..368189f4b4e 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -124,6 +124,13 @@ func WithIncConfVer() RegionCreateOption { } } +// WithInterval sets the interval +func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { + return func(region *RegionInfo) { + region.interval = interval + } +} + // WithDecConfVer decreases the config version of the region. func WithDecConfVer() RegionCreateOption { return func(region *RegionInfo) { diff --git a/server/statistics/hot_peer_cache.go b/server/statistics/hot_peer_cache.go index fb00e7362de..5695b617034 100644 --- a/server/statistics/hot_peer_cache.go +++ b/server/statistics/hot_peer_cache.go @@ -15,6 +15,7 @@ package statistics import ( "math" + "sync" "time" "github.com/pingcap/kvproto/pkg/metapb" @@ -54,24 +55,30 @@ var ( // hotPeerCache saves the hot peer's statistics. type hotPeerCache struct { - kind FlowKind - peersOfStore map[uint64]*TopN // storeID -> hot peers - storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs + mu struct { + sync.Mutex + peersOfStore map[uint64]*TopN // storeID -> hot peers + storesOfRegion map[uint64]map[uint64]struct{} // regionID -> storeIDs + } + kind FlowKind } // NewHotStoresStats creates a HotStoresStats func NewHotStoresStats(kind FlowKind) *hotPeerCache { - return &hotPeerCache{ - kind: kind, - peersOfStore: make(map[uint64]*TopN), - storesOfRegion: make(map[uint64]map[uint64]struct{}), + c := &hotPeerCache{ + kind: kind, } + c.mu.peersOfStore = make(map[uint64]*TopN) + c.mu.storesOfRegion = make(map[uint64]map[uint64]struct{}) + return c } // RegionStats returns hot items func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { + f.mu.Lock() + defer f.mu.Unlock() res := make(map[uint64][]*HotPeerStat) - for storeID, peers := range f.peersOfStore { + for storeID, peers := range f.mu.peersOfStore { values := peers.GetAll() stat := make([]*HotPeerStat, 0, len(values)) for _, v := range values { @@ -86,32 +93,34 @@ func (f *hotPeerCache) RegionStats(minHotDegree int) map[uint64][]*HotPeerStat { // Update updates the items in statistics. func (f *hotPeerCache) Update(item *HotPeerStat) { + f.mu.Lock() + defer f.mu.Unlock() if item.IsNeedDelete() { - if peers, ok := f.peersOfStore[item.StoreID]; ok { + if peers, ok := f.mu.peersOfStore[item.StoreID]; ok { peers.Remove(item.RegionID) } - if stores, ok := f.storesOfRegion[item.RegionID]; ok { + if stores, ok := f.mu.storesOfRegion[item.RegionID]; ok { delete(stores, item.StoreID) } } else { - peers, ok := f.peersOfStore[item.StoreID] + peers, ok := f.mu.peersOfStore[item.StoreID] if !ok { peers = NewTopN(dimLen, TopNN, topNTTL) - f.peersOfStore[item.StoreID] = peers + f.mu.peersOfStore[item.StoreID] = peers } peers.Put(item) - stores, ok := f.storesOfRegion[item.RegionID] + stores, ok := f.mu.storesOfRegion[item.RegionID] if !ok { stores = make(map[uint64]struct{}) - f.storesOfRegion[item.RegionID] = stores + f.mu.storesOfRegion[item.RegionID] = stores } stores[item.StoreID] = struct{}{} } } -func (f *hotPeerCache) collectRegionMetrics(byteRate, keyRate float64, interval uint64) { +func (f *hotPeerCache) collectRegionMetricsLocked(byteRate, keyRate float64, interval uint64) { regionHeartbeatIntervalHist.Observe(float64(interval)) if interval == 0 { return @@ -128,9 +137,11 @@ func (f *hotPeerCache) collectRegionMetrics(byteRate, keyRate float64, interval // CheckRegionFlow checks the flow information of region. func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerStat) { + f.mu.Lock() + defer f.mu.Unlock() - bytes := float64(f.getRegionBytes(region)) - keys := float64(f.getRegionKeys(region)) + bytes := float64(f.getRegionBytesLocked(region)) + keys := float64(f.getRegionKeysLocked(region)) reportInterval := region.GetInterval() interval := reportInterval.GetEndTimestamp() - reportInterval.GetStartTimestamp() @@ -138,7 +149,7 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS byteRate := bytes / float64(interval) keyRate := keys / float64(interval) - f.collectRegionMetrics(byteRate, keyRate, interval) + f.collectRegionMetricsLocked(byteRate, keyRate, interval) // old region is in the front and new region is in the back // which ensures it will hit the cache if moving peer or transfer leader occurs with the same replica number @@ -148,11 +159,11 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS } var tmpItem *HotPeerStat - storeIDs := f.getAllStoreIDs(region) - justTransferLeader := f.justTransferLeader(region) + storeIDs := f.getAllStoreIDsLocked(region) + justTransferLeader := f.justTransferLeaderLocked(region) for _, storeID := range storeIDs { - isExpired := f.isRegionExpired(region, storeID) // transfer read leader or remove write peer - oldItem := f.getOldHotPeerStat(region.GetID(), storeID) + isExpired := f.isRegionExpiredLocked(region, storeID) // transfer read leader or remove write peer + oldItem := f.getOldHotPeerStatLocked(region.GetID(), storeID) if isExpired && oldItem != nil { // it may has been moved to other store, we save it to tmpItem tmpItem = oldItem } @@ -162,7 +173,7 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS continue } - thresholds := f.calcHotThresholds(storeID) + thresholds := f.calcHotThresholdsLocked(storeID) newItem := &HotPeerStat{ StoreID: storeID, @@ -184,7 +195,7 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS oldItem = tmpItem } else { // new item is new peer after adding replica for _, storeID := range storeIDs { - oldItem = f.getOldHotPeerStat(region.GetID(), storeID) + oldItem = f.getOldHotPeerStatLocked(region.GetID(), storeID) if oldItem != nil { break } @@ -192,7 +203,7 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS } } - newItem = f.updateHotPeerStat(newItem, oldItem, bytes, keys, time.Duration(interval)*time.Second) + newItem = f.updateHotPeerStatLocked(newItem, oldItem, bytes, keys, time.Duration(interval)*time.Second) if newItem != nil { ret = append(ret, newItem) } @@ -208,19 +219,23 @@ func (f *hotPeerCache) CheckRegionFlow(region *core.RegionInfo) (ret []*HotPeerS } func (f *hotPeerCache) IsRegionHot(region *core.RegionInfo, hotDegree int) bool { + f.mu.Lock() + defer f.mu.Unlock() switch f.kind { case WriteFlow: - return f.isRegionHotWithAnyPeers(region, hotDegree) + return f.isRegionHotWithAnyPeersLocked(region, hotDegree) case ReadFlow: - return f.isRegionHotWithPeer(region, region.GetLeader(), hotDegree) + return f.isRegionHotWithPeerLocked(region, region.GetLeader(), hotDegree) } return false } func (f *hotPeerCache) CollectMetrics(typ string) { - for storeID, peers := range f.peersOfStore { + f.mu.Lock() + defer f.mu.Unlock() + for storeID, peers := range f.mu.peersOfStore { store := storeTag(storeID) - thresholds := f.calcHotThresholds(storeID) + thresholds := f.calcHotThresholdsLocked(storeID) hotCacheStatusGauge.WithLabelValues("total_length", store, typ).Set(float64(peers.Len())) hotCacheStatusGauge.WithLabelValues("byte-rate-threshold", store, typ).Set(thresholds[byteDim]) hotCacheStatusGauge.WithLabelValues("key-rate-threshold", store, typ).Set(thresholds[keyDim]) @@ -229,17 +244,19 @@ func (f *hotPeerCache) CollectMetrics(typ string) { } } -func (f *hotPeerCache) getRegionBytes(region *core.RegionInfo) uint64 { +func (f *hotPeerCache) getRegionBytesLocked(region *core.RegionInfo) uint64 { switch f.kind { case WriteFlow: - return region.GetBytesWritten() + x := region.GetBytesWritten() + return x case ReadFlow: - return region.GetBytesRead() + y := region.GetBytesRead() + return y } return 0 } -func (f *hotPeerCache) getRegionKeys(region *core.RegionInfo) uint64 { +func (f *hotPeerCache) getRegionKeysLocked(region *core.RegionInfo) uint64 { switch f.kind { case WriteFlow: return region.GetKeysWritten() @@ -249,8 +266,8 @@ func (f *hotPeerCache) getRegionKeys(region *core.RegionInfo) uint64 { return 0 } -func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat { - if hotPeers, ok := f.peersOfStore[storeID]; ok { +func (f *hotPeerCache) getOldHotPeerStatLocked(regionID, storeID uint64) *HotPeerStat { + if hotPeers, ok := f.mu.peersOfStore[storeID]; ok { if v := hotPeers.Get(regionID); v != nil { return v.(*HotPeerStat) } @@ -258,7 +275,7 @@ func (f *hotPeerCache) getOldHotPeerStat(regionID, storeID uint64) *HotPeerStat return nil } -func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64) bool { +func (f *hotPeerCache) isRegionExpiredLocked(region *core.RegionInfo, storeID uint64) bool { switch f.kind { case WriteFlow: return region.GetStorePeer(storeID) == nil @@ -268,9 +285,9 @@ func (f *hotPeerCache) isRegionExpired(region *core.RegionInfo, storeID uint64) return false } -func (f *hotPeerCache) calcHotThresholds(storeID uint64) [dimLen]float64 { +func (f *hotPeerCache) calcHotThresholdsLocked(storeID uint64) [dimLen]float64 { minThresholds := minHotThresholds[f.kind] - tn, ok := f.peersOfStore[storeID] + tn, ok := f.mu.peersOfStore[storeID] if !ok || tn.Len() < TopNN { return minThresholds } @@ -285,11 +302,11 @@ func (f *hotPeerCache) calcHotThresholds(storeID uint64) [dimLen]float64 { } // gets the storeIDs, including old region and new region -func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { +func (f *hotPeerCache) getAllStoreIDsLocked(region *core.RegionInfo) []uint64 { storeIDs := make(map[uint64]struct{}) ret := make([]uint64, 0, len(region.GetPeers())) // old stores - ids, ok := f.storesOfRegion[region.GetID()] + ids, ok := f.mu.storesOfRegion[region.GetID()] if ok { for storeID := range ids { storeIDs[storeID] = struct{}{} @@ -311,7 +328,7 @@ func (f *hotPeerCache) getAllStoreIDs(region *core.RegionInfo) []uint64 { return ret } -func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool { +func (f *hotPeerCache) isOldColdPeerLocked(oldItem *HotPeerStat, storeID uint64) bool { isOldPeer := func() bool { for _, id := range oldItem.peers { if id == storeID { @@ -321,7 +338,7 @@ func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool return false } noInCache := func() bool { - ids, ok := f.storesOfRegion[oldItem.RegionID] + ids, ok := f.mu.storesOfRegion[oldItem.RegionID] if ok { for id := range ids { if id == storeID { @@ -334,11 +351,11 @@ func (f *hotPeerCache) isOldColdPeer(oldItem *HotPeerStat, storeID uint64) bool return isOldPeer() && noInCache() } -func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo) bool { - ids, ok := f.storesOfRegion[region.GetID()] +func (f *hotPeerCache) justTransferLeaderLocked(region *core.RegionInfo) bool { + ids, ok := f.mu.storesOfRegion[region.GetID()] if ok { for storeID := range ids { - oldItem := f.getOldHotPeerStat(region.GetID(), storeID) + oldItem := f.getOldHotPeerStatLocked(region.GetID(), storeID) if oldItem == nil { continue } @@ -350,21 +367,21 @@ func (f *hotPeerCache) justTransferLeader(region *core.RegionInfo) bool { return false } -func (f *hotPeerCache) isRegionHotWithAnyPeers(region *core.RegionInfo, hotDegree int) bool { +func (f *hotPeerCache) isRegionHotWithAnyPeersLocked(region *core.RegionInfo, hotDegree int) bool { for _, peer := range region.GetPeers() { - if f.isRegionHotWithPeer(region, peer, hotDegree) { + if f.isRegionHotWithPeerLocked(region, peer, hotDegree) { return true } } return false } -func (f *hotPeerCache) isRegionHotWithPeer(region *core.RegionInfo, peer *metapb.Peer, hotDegree int) bool { +func (f *hotPeerCache) isRegionHotWithPeerLocked(region *core.RegionInfo, peer *metapb.Peer, hotDegree int) bool { if peer == nil { return false } storeID := peer.GetStoreId() - if peers, ok := f.peersOfStore[storeID]; ok { + if peers, ok := f.mu.peersOfStore[storeID]; ok { if stat := peers.Get(region.GetID()); stat != nil { return stat.(*HotPeerStat).HotDegree >= hotDegree } @@ -376,7 +393,7 @@ func (f *hotPeerCache) getDefaultTimeMedian() *movingaverage.TimeMedian { return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, RegionHeartBeatReportInterval*time.Second) } -func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, keys float64, interval time.Duration) *HotPeerStat { +func (f *hotPeerCache) updateHotPeerStatLocked(newItem, oldItem *HotPeerStat, bytes, keys float64, interval time.Duration) *HotPeerStat { if newItem.needDelete { return newItem } @@ -422,7 +439,7 @@ func (f *hotPeerCache) updateHotPeerStat(newItem, oldItem *HotPeerStat, bytes, k newItem.HotDegree = oldItem.HotDegree newItem.AntiCount = oldItem.AntiCount } else { - if f.isOldColdPeer(oldItem, newItem.StoreID) { + if f.isOldColdPeerLocked(oldItem, newItem.StoreID) { if newItem.isFullAndHot() { newItem.HotDegree = 1 newItem.AntiCount = hotRegionAntiCount diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index d88ecf3ac0b..f837e2908a3 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -15,18 +15,15 @@ package statistics import ( "math/rand" - "testing" + "sync" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" ) -func Test(t *testing.T) { - TestingT(t) -} - var _ = Suite(&testHotPeerCache{}) type testHotPeerCache struct{} @@ -124,7 +121,7 @@ func checkHit(c *C, cache *hotPeerCache, region *core.RegionInfo, kind FlowKind, peers = region.GetPeers() } for _, peer := range peers { - item := cache.getOldHotPeerStat(region.GetID(), peer.StoreId) + item := cache.getOldHotPeerStatLocked(region.GetID(), peer.StoreId) c.Assert(item, NotNil) c.Assert(item.isNew, Equals, !isHit) } @@ -218,60 +215,90 @@ func newPeers(n int, pid genID, sid genID) []*metapb.Peer { return peers } +func (t *testHotPeerCache) TestUpdateHotPeerStatInConcurrency(c *C) { + cache := NewHotStoresStats(ReadFlow) + region := core.NewRegionInfo(&metapb.Region{ + Id: 1, + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + }, + }, + &metapb.Peer{Id: 101, StoreId: 1}, + ) + wg := &sync.WaitGroup{} + wg.Add(100) + for i := 0; i < 100; i++ { + newRegion := region.Clone( + core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), + core.SetReadBytes(30000*10), + core.SetReadKeys(300000*10)) + go func(region *core.RegionInfo, wg *sync.WaitGroup) { + defer wg.Done() + rets := cache.CheckRegionFlow(region) + for _, ret := range rets { + cache.Update(ret) + } + }(newRegion, wg) + } + wg.Wait() +} + func (t *testHotPeerCache) TestUpdateHotPeerStat(c *C) { cache := NewHotStoresStats(ReadFlow) // skip interval=0 newItem := &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 0) + newItem = cache.updateHotPeerStatLocked(newItem, nil, 0, 0, 0) c.Check(newItem, IsNil) // new peer, interval is larger than report interval, but no hot newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{1.0, 1.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 0, 0, 60*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, nil, 0, 0, 60*time.Second) c.Check(newItem, IsNil) // new peer, interval is less than report interval newItem = &HotPeerStat{needDelete: false, thresholds: [2]float64{0.0, 0.0}} - newItem = cache.updateHotPeerStat(newItem, nil, 60, 60, 30*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, nil, 60, 60, 30*time.Second) c.Check(newItem, NotNil) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is less than report interval oldItem := newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, oldItem, 60, 60, 10*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 30*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, oldItem, 60, 60, 30*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2) // sum of interval is less than report interval oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 10*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, oldItem, 60, 60, 10*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 2) // sum of interval is larger than report interval, and hot oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 50*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, oldItem, 60, 60, 50*time.Second) c.Check(newItem.HotDegree, Equals, 2) c.Check(newItem.AntiCount, Equals, 2) // sum of interval is larger than report interval, and cold oldItem = newItem newItem.thresholds = [2]float64{10.0, 10.0} - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, oldItem, 60, 60, 60*time.Second) c.Check(newItem.HotDegree, Equals, 1) c.Check(newItem.AntiCount, Equals, 1) // sum of interval is larger than report interval, and cold oldItem = newItem - newItem = cache.updateHotPeerStat(newItem, oldItem, 60, 60, 60*time.Second) + newItem = cache.updateHotPeerStatLocked(newItem, oldItem, 60, 60, 60*time.Second) c.Check(newItem.HotDegree, Equals, 0) c.Check(newItem.AntiCount, Equals, 0) c.Check(newItem.needDelete, Equals, true) } -func (t *testHotPeerCache) TestThresholdWithUpdateHotPeerStat(c *C) { +func (t *testHotPeerCache) TestThresholdWithupdateHotPeerStatLocked(c *C) { byteRate := minHotThresholds[ReadFlow][byteDim] * 2 expectThreshold := byteRate * HotThresholdRatio t.testMetrics(c, 120., byteRate, expectThreshold) @@ -288,7 +315,7 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold for i := uint64(1); i < TopNN+10; i++ { var oldItem *HotPeerStat for { - thresholds := cache.calcHotThresholds(storeID) + thresholds := cache.calcHotThresholdsLocked(storeID) newItem := &HotPeerStat{ StoreID: storeID, RegionID: i, @@ -297,14 +324,14 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold ByteRate: byteRate, KeyRate: 0, } - oldItem = cache.getOldHotPeerStat(i, storeID) + oldItem = cache.getOldHotPeerStatLocked(i, storeID) if oldItem != nil && oldItem.rollingByteRate.isHot(thresholds) == true { break } - item := cache.updateHotPeerStat(newItem, oldItem, byteRate*interval, 0, time.Duration(interval)*time.Second) + item := cache.updateHotPeerStatLocked(newItem, oldItem, byteRate*interval, 0, time.Duration(interval)*time.Second) cache.Update(item) } - thresholds := cache.calcHotThresholds(storeID) + thresholds := cache.calcHotThresholdsLocked(storeID) if i < TopNN { c.Assert(thresholds[byteDim], Equals, minThresholds[byteDim]) } else {