Skip to content

Commit

Permalink
movingaverage: support concurrency safe queue in AvgOverTime (#3506) (#…
Browse files Browse the repository at this point in the history
…3509)

* cherry pick #3506 to release-4.0

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>

* fix conflict

Signed-off-by: yisaer <disxiaofei@163.com>

* use init

Signed-off-by: yisaer <disxiaofei@163.com>

Co-authored-by: Song Gao <disxiaofei@163.com>
Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com>
  • Loading branch information
3 people authored Mar 25, 2021
1 parent cd3b13e commit 05b9d6d
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 5 deletions.
7 changes: 7 additions & 0 deletions server/core/region_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,3 +269,10 @@ func WithReplacePeerStore(oldStoreID, newStoreID uint64) RegionCreateOption {
}
}
}

// WithInterval sets the interval
func WithInterval(interval *pdpb.TimeInterval) RegionCreateOption {
return func(region *RegionInfo) {
region.interval = interval
}
}
8 changes: 3 additions & 5 deletions server/statistics/avg_over_time.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ package statistics

import (
"time"

"github.com/phf/go-queue/queue"
)

const (
Expand All @@ -35,7 +33,7 @@ type deltaWithInterval struct {
// stores recent changes that happened in the last avgInterval,
// then calculates the change rate by (sum of changes) / (sum of intervals).
type AvgOverTime struct {
que *queue.Queue
que *SafeQueue
deltaSum float64
intervalSum time.Duration
avgInterval time.Duration
Expand All @@ -44,7 +42,7 @@ type AvgOverTime struct {
// NewAvgOverTime returns an AvgOverTime with given interval.
func NewAvgOverTime(interval time.Duration) *AvgOverTime {
return &AvgOverTime{
que: queue.New(),
que: NewSafeQueue(),
deltaSum: 0,
intervalSum: 0,
avgInterval: interval,
Expand All @@ -58,7 +56,7 @@ func (aot *AvgOverTime) Get() float64 {

// Clear clears the AvgOverTime.
func (aot *AvgOverTime) Clear() {
aot.que = queue.New()
aot.que.Init()
aot.intervalSum = 0
aot.deltaSum = 0
}
Expand Down
54 changes: 54 additions & 0 deletions server/statistics/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
"sync"

"github.com/phf/go-queue/queue"
)

// SafeQueue is a concurrency safe queue
type SafeQueue struct {
mu sync.Mutex
que *queue.Queue
}

// NewSafeQueue return a SafeQueue
func NewSafeQueue() *SafeQueue {
sq := &SafeQueue{}
sq.que = queue.New()
return sq
}

// Init implement init
func (sq *SafeQueue) Init() {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.que.Init()
}

// PushBack implement PushBack
func (sq *SafeQueue) PushBack(v interface{}) {
sq.mu.Lock()
defer sq.mu.Unlock()
sq.que.PushBack(v)
}

// PopFront implement PopFront
func (sq *SafeQueue) PopFront() interface{} {
sq.mu.Lock()
defer sq.mu.Unlock()
return sq.que.PopFront()
}
28 changes: 28 additions & 0 deletions server/statistics/queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package statistics

import (
. "github.com/pingcap/check"
)

func (t *testMovingAvg) TestQueue(c *C) {
sq := NewSafeQueue()
sq.PushBack(1)
sq.PushBack(2)
v1 := sq.PopFront()
v2 := sq.PopFront()
c.Assert(1, Equals, v1.(int))
c.Assert(2, Equals, v2.(int))
}

0 comments on commit 05b9d6d

Please sign in to comment.