diff --git a/server/core/region_option.go b/server/core/region_option.go index e84eefc7d56..1292ec89965 100644 --- a/server/core/region_option.go +++ b/server/core/region_option.go @@ -269,3 +269,10 @@ func WithReplacePeerStore(oldStoreID, newStoreID uint64) RegionCreateOption { } } } + +// WithInterval sets the interval +func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption { + return func(region *RegionInfo) { + region.interval = interval + } +} diff --git a/server/statistics/avg_over_time.go b/server/statistics/avg_over_time.go index cb58e445a49..894c2502daa 100644 --- a/server/statistics/avg_over_time.go +++ b/server/statistics/avg_over_time.go @@ -15,8 +15,6 @@ package statistics import ( "time" - - "github.com/phf/go-queue/queue" ) const ( @@ -35,7 +33,7 @@ type deltaWithInterval struct { // stores recent changes that happened in the last avgInterval, // then calculates the change rate by (sum of changes) / (sum of intervals). type AvgOverTime struct { - que *queue.Queue + que *SafeQueue deltaSum float64 intervalSum time.Duration avgInterval time.Duration @@ -44,7 +42,7 @@ type AvgOverTime struct { // NewAvgOverTime returns an AvgOverTime with given interval. func NewAvgOverTime(interval time.Duration) *AvgOverTime { return &AvgOverTime{ - que: queue.New(), + que: NewSafeQueue(), deltaSum: 0, intervalSum: 0, avgInterval: interval, @@ -58,7 +56,7 @@ func (aot *AvgOverTime) Get() float64 { // Clear clears the AvgOverTime. func (aot *AvgOverTime) Clear() { - aot.que = queue.New() + aot.que.Init() aot.intervalSum = 0 aot.deltaSum = 0 } diff --git a/server/statistics/queue.go b/server/statistics/queue.go new file mode 100644 index 00000000000..f61fa023e8a --- /dev/null +++ b/server/statistics/queue.go @@ -0,0 +1,54 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + "sync" + + "github.com/phf/go-queue/queue" +) + +// SafeQueue is a concurrency safe queue +type SafeQueue struct { + mu sync.Mutex + que *queue.Queue +} + +// NewSafeQueue return a SafeQueue +func NewSafeQueue() *SafeQueue { + sq := &SafeQueue{} + sq.que = queue.New() + return sq +} + +// Init implement init +func (sq *SafeQueue) Init() { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.que.Init() +} + +// PushBack implement PushBack +func (sq *SafeQueue) PushBack(v interface{}) { + sq.mu.Lock() + defer sq.mu.Unlock() + sq.que.PushBack(v) +} + +// PopFront implement PopFront +func (sq *SafeQueue) PopFront() interface{} { + sq.mu.Lock() + defer sq.mu.Unlock() + return sq.que.PopFront() +} diff --git a/server/statistics/queue_test.go b/server/statistics/queue_test.go new file mode 100644 index 00000000000..455f1002ce9 --- /dev/null +++ b/server/statistics/queue_test.go @@ -0,0 +1,28 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package statistics + +import ( + . "github.com/pingcap/check" +) + +func (t *testMovingAvg) TestQueue(c *C) { + sq := NewSafeQueue() + sq.PushBack(1) + sq.PushBack(2) + v1 := sq.PopFront() + v2 := sq.PopFront() + c.Assert(1, Equals, v1.(int)) + c.Assert(2, Equals, v2.(int)) +}