diff --git a/pkg/movingaverage/queue.go b/pkg/movingaverage/queue.go new file mode 100644 index 00000000000..04a1a159195 --- /dev/null +++ b/pkg/movingaverage/queue.go @@ -0,0 +1,54 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package movingaverage + +import ( + "sync" + + "github.com/phf/go-queue/queue" +) + +// SafeQueue is a concurrency safe queue +type SafeQueue struct { + mu sync.Mutex + que *queue.Queue +} + +// NewSafeQueue return a SafeQueue +func NewSafeQueue() *SafeQueue { + sq := &SafeQueue{} + sq.que = queue.New() + return sq +} + +// Init implement init +func (sq *SafeQueue) Init() { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.que.Init() +} + +// PushBack implement PushBack +func (sq *SafeQueue) PushBack(v interface{}) { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.que.PushBack(v) +} + +// PopFront implement PopFront +func (sq *SafeQueue) PopFront() interface{} { + sq.mu.Lock() + defer sq.mu.Unlock() + return sq.que.PopFront() +} diff --git a/pkg/movingaverage/queue_test.go b/pkg/movingaverage/queue_test.go new file mode 100644 index 00000000000..84b51488f9f --- /dev/null +++ b/pkg/movingaverage/queue_test.go @@ -0,0 +1,28 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package movingaverage + +import ( + . "github.com/pingcap/check" +) + +func (t *testMovingAvg) TestQueue(c *C) { + sq := NewSafeQueue() + sq.PushBack(1) + sq.PushBack(2) + v1 := sq.PopFront() + v2 := sq.PopFront() + c.Assert(1, Equals, v1.(int)) + c.Assert(2, Equals, v2.(int)) +} diff --git a/server/core/region_option.go b/server/core/region_option.go index d1b28c0bdbc..e201ff691c4 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -269,3 +269,10 @@ func WithReplacePeerStore(oldStoreID, newStoreID uint64) RegionCreateOption { } } } + +// WithInterval sets the interval +func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { + return func(region *RegionInfo) { + region.interval = interval + } +} diff --git a/server/statistics/hot_peer_cache_test.go b/server/statistics/hot_peer_cache_test.go index b61b33a4347..605d36423dd 100644 --- a/server/statistics/hot_peer_cache_test.go +++ b/server/statistics/hot_peer_cache_test.go @@ -15,10 +15,12 @@ package statistics import ( "math/rand" + "testing" "time" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/server/core" ) @@ -307,3 +309,28 @@ func (t *testHotPeerCache) testMetrics(c *C, interval, byteRate, expectThreshold } } } + +func BenchmarkCheckRegionFlow(b *testing.B) { + cache := NewHotStoresStats(ReadFlow) + region := core.NewRegionInfo(&metapb.Region{ + Id: 1, + Peers: []*metapb.Peer{ + {Id: 101, StoreId: 1}, + {Id: 102, StoreId: 2}, + {Id: 103, StoreId: 3}, + }, + }, + &metapb.Peer{Id: 101, StoreId: 1}, + ) + newRegion := region.Clone( + core.WithInterval(&pdpb.TimeInterval{StartTimestamp: 0, EndTimestamp: 10}), + core.SetReadBytes(30000*10), + core.SetReadKeys(300000*10)) + b.ResetTimer() + for i := 0; i < b.N; i++ { + rets := cache.CheckRegionFlow(newRegion) + for _, ret := range rets { + cache.Update(ret) + } + } +}