Skip to content

Commit

Permalink
resource_manager: supports burstable mode (#5881)
Browse files Browse the repository at this point in the history
ref #4399, ref pingcap/tidb#40380

resource_manager: supports burstable mode

Signed-off-by: nolouch <nolouch@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
nolouch and ti-chi-bot authored Jan 31, 2023
1 parent 62a7452 commit 43bf712
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 14 deletions.
29 changes: 26 additions & 3 deletions pkg/mcs/resource_manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -351,6 +352,9 @@ type groupCostController struct {
consumption *rmpb.Consumption
}

// fast path to make once token limit with un-limit burst.
burstable *atomic.Bool

lowRUNotifyChan chan struct{}
// run contains the state that is updated by the main loop.
run struct {
Expand Down Expand Up @@ -404,6 +408,7 @@ func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNot
calculators: []ResourceCalculator{newKVCalculator(mainCfg), newSQLCalculator(mainCfg)},
mode: group.GetMode(),
lowRUNotifyChan: lowRUNotifyChan,
burstable: &atomic.Bool{},
}

switch gc.mode {
Expand Down Expand Up @@ -431,7 +436,7 @@ func (gc *groupCostController) initRunState() {
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan),
limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
avgLastTime: now,
}
Expand All @@ -441,7 +446,7 @@ func (gc *groupCostController) initRunState() {
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceList {
counter := &tokenCounter{
limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan),
limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
avgLastTime: now,
}
Expand Down Expand Up @@ -516,21 +521,31 @@ func (gc *groupCostController) handleTokenBucketTrickEvent(ctx context.Context)
}

func (gc *groupCostController) updateAvgRaWResourcePerSec() {
isBurstable := true
for typ, counter := range gc.run.resourceTokens {
if counter.limiter.GetBurst() >= 0 {
isBurstable = false
}
if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}

func (gc *groupCostController) updateAvgRUPerSec() {
isBurstable := true
for typ, counter := range gc.run.requestUnitTokens {
if counter.limiter.GetBurst() >= 0 {
isBurstable = false
}
if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) {
continue
}
log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec))
}
gc.burstable.Store(isBurstable)
}

func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool {
Expand Down Expand Up @@ -623,6 +638,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
}

var cfg tokenBucketReconfigureArgs
cfg.NewBurst = bucket.GetSettings().GetBurstLimit()
// when trickleTimeMs equals zero, server has enough tokens and does not need to
// limit client consume token. So all token is granted to client right now.
if trickleTimeMs == 0 {
Expand Down Expand Up @@ -723,6 +739,9 @@ func (gc *groupCostController) onRequestWait(
calc.BeforeKVRequest(delta, info)
}
now := time.Now()
if gc.burstable.Load() {
goto ret
}
// retry
retryLoop:
for i := 0; i < maxRetry; i++ {
Expand Down Expand Up @@ -753,6 +772,7 @@ retryLoop:
if err != nil {
return err
}
ret:
gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()
Expand All @@ -764,7 +784,9 @@ func (gc *groupCostController) onResponse(req RequestInfo, resp ResponseInfo) {
for _, calc := range gc.calculators {
calc.AfterKVRequest(delta, req, resp)
}

if gc.burstable.Load() {
goto ret
}
switch gc.mode {
case rmpb.GroupMode_RawMode:
for typ, counter := range gc.run.resourceTokens {
Expand All @@ -779,6 +801,7 @@ func (gc *groupCostController) onResponse(req RequestInfo, resp ResponseInfo) {
}
}
}
ret:
gc.mu.Lock()
add(gc.mu.consumption, delta)
gc.mu.Unlock()
Expand Down
54 changes: 54 additions & 0 deletions pkg/mcs/resource_manager/client/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,g
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package client

import (
"testing"
"time"

rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/stretchr/testify/require"
)

func TestGroupControlBurstable(t *testing.T) {
re := require.New(t)
group := &rmpb.ResourceGroup{
Name: "test",
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RRU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 1000,
},
},
},
}
ch := make(chan struct{})
gc := newGroupCostController(group, DefaultConfig(), ch)
gc.initRunState()
args := tokenBucketReconfigureArgs{
NewRate: 1000,
NewBurst: -1,
}
for _, counter := range gc.run.requestUnitTokens {
counter.limiter.Reconfigure(time.Now(), args)
}
gc.updateAvgRequestResourcePerSec()
re.Equal(gc.burstable.Load(), true)
}
21 changes: 19 additions & 2 deletions pkg/mcs/resource_manager/client/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,16 @@ func Every(interval time.Duration) Limit {
// If no token is available, Reserve returns a reservation for a future token
// and the amount of time the caller must wait before using it,
// or its associated context.Context is canceled.
//
// Some changes about burst(b):
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst within a capacity).
// - If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within a unlimited capacity).
// - If b > 0, that means the limiter is limited capacity. (current not used).
type Limiter struct {
mu sync.Mutex
limit Limit
tokens float64
burst int64
// last is the last time the limiter's tokens field was updated
last time.Time
notifyThreshold float64
Expand All @@ -83,11 +89,12 @@ func (lim *Limiter) Limit() Limit {

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(now time.Time, r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter {
func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter {
lim := &Limiter{
limit: r,
last: now,
tokens: tokens,
burst: b,
lowTokensNotifyChan: lowTokensNotifyChan,
}
log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
Expand Down Expand Up @@ -234,6 +241,13 @@ func (lim *Limiter) IsLowTokens() bool {
return lim.isLowTokensLocked()
}

// GetBurst returns the burst size of the limiter
func (lim *Limiter) GetBurst() int64 {
lim.mu.Lock()
defer lim.mu.Unlock()
return lim.burst
}

// RemoveTokens decreases the amount of tokens currently available.
func (lim *Limiter) RemoveTokens(now time.Time, amount float64) {
lim.mu.Lock()
Expand All @@ -249,6 +263,8 @@ type tokenBucketReconfigureArgs struct {

NewRate float64

NewBurst int64

NotifyThreshold float64
}

Expand All @@ -261,6 +277,7 @@ func (lim *Limiter) Reconfigure(now time.Time, args tokenBucketReconfigureArgs)
lim.last = now
lim.tokens = tokens + args.NewTokens
lim.limit = Limit(args.NewRate)
lim.burst = args.NewBurst
lim.notifyThreshold = args.NotifyThreshold
lim.isLowProcess = false
lim.maybeNotify()
Expand All @@ -282,7 +299,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur
lim.mu.Lock()
defer lim.mu.Unlock()

if lim.limit == Inf {
if lim.limit == Inf || lim.burst < 0 {
return Reservation{
ok: true,
lim: lim,
Expand Down
20 changes: 14 additions & 6 deletions pkg/mcs/resource_manager/client/limiter_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2015 The Go Authors. All rights reserved.
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

Expand Down Expand Up @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo
}

func TestSimpleReserve(t *testing.T) {
lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))

runReserveMax(t, lim, request{t0, 3, t1, true})
runReserveMax(t, lim, request{t0, 3, t4, true})
Expand All @@ -93,25 +93,33 @@ func TestSimpleReserve(t *testing.T) {
runReserve(t, lim, request{t5, 2000, t6, false}, time.Second*100)

runReserve(t, lim, request{t3, 2, t8, true}, time.Second*8)
// unlimited
args := tokenBucketReconfigureArgs{
NewBurst: -1,
}
lim.Reconfigure(t1, args)
runReserveMax(t, lim, request{t5, 2000, t5, true})
}

func TestReconfig(t *testing.T) {
re := require.New(t)
lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1))
lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1))

runReserveMax(t, lim, request{t0, 4, t2, true})
args := tokenBucketReconfigureArgs{
NewTokens: 6.,
NewRate: 2,
NewBurst: -1,
}
lim.Reconfigure(t1, args)
checkTokens(re, lim, t1, 5)
checkTokens(re, lim, t2, 7)
re.Equal(int64(-1), lim.GetBurst())
}

func TestNotify(t *testing.T) {
nc := make(chan struct{}, 1)
lim := NewLimiter(t0, 1, 0, nc)
lim := NewLimiter(t0, 1, 0, 0, nc)

args := tokenBucketReconfigureArgs{
NewTokens: 1000.,
Expand All @@ -132,8 +140,8 @@ func TestCancel(t *testing.T) {
ctx1, cancel1 := context.WithDeadline(ctx, t2)
re := require.New(t)
nc := make(chan struct{}, 1)
lim1 := NewLimiter(t0, 1, 10, nc)
lim2 := NewLimiter(t0, 1, 0, nc)
lim1 := NewLimiter(t0, 1, 0, 10, nc)
lim2 := NewLimiter(t0, 1, 0, 0, nc)

r1 := runReserveMax(t, lim1, request{t0, 5, t0, true})
checkTokens(re, lim1, t0, 5)
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
}

var res rmpb.TokenBucket
res.Settings = &rmpb.TokenLimitSettings{}
res.Settings = &rmpb.TokenLimitSettings{BurstLimit: t.GetSettings().GetBurstLimit()}
// FillRate is used for the token server unavailable in abnormal situation.
if neededTokens <= 0 {
return &res, 0
Expand Down
10 changes: 8 additions & 2 deletions tests/msc/resource_manager/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() {
RUSettings: &rmpb.GroupRequestUnitSettings{
RRU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 40000,
FillRate: 40000,
BurstLimit: -1,
},
Tokens: 100000,
},
WRU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 20000,
FillRate: 20000,
BurstLimit: -1,
},
Tokens: 50000,
},
Expand Down Expand Up @@ -382,6 +384,9 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
for _, resp := range aresp {
re.Len(resp.GrantedRUTokens, 1)
re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, float64(100.))
if resp.ResourceGroupName == "test2" {
re.Equal(int64(-1), resp.GrantedRUTokens[0].GrantedTokens.GetSettings().GetBurstLimit())
}
}
gresp, err := cli.GetResourceGroup(suite.ctx, groups[0].GetName())
re.NoError(err)
Expand All @@ -391,6 +396,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() {
re.Equal(g1.GetName(), g2.GetName())
re.Equal(g1.GetMode(), g2.GetMode())
re.Equal(g1.GetRUSettings().RRU.Settings.FillRate, g2.GetRUSettings().RRU.Settings.FillRate)
re.Equal(g1.GetRUSettings().RRU.Settings.BurstLimit, g2.GetRUSettings().RRU.Settings.BurstLimit)
// now we don't persistent tokens in running state, so tokens is original.
re.Equal(g1.GetRUSettings().RRU.Tokens, g2.GetRUSettings().RRU.Tokens)
re.NoError(err)
Expand Down

0 comments on commit 43bf712

Please sign in to comment.