diff --git a/pkg/mcs/resource_manager/client/client.go b/pkg/mcs/resource_manager/client/client.go index 58db0444296..f9b80fba0b3 100644 --- a/pkg/mcs/resource_manager/client/client.go +++ b/pkg/mcs/resource_manager/client/client.go @@ -17,6 +17,7 @@ package client import ( "context" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -351,6 +352,9 @@ type groupCostController struct { consumption *rmpb.Consumption } + // fast path to make once token limit with un-limit burst. + burstable *atomic.Bool + lowRUNotifyChan chan struct{} // run contains the state that is updated by the main loop. run struct { @@ -404,6 +408,7 @@ func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNot calculators: []ResourceCalculator{newKVCalculator(mainCfg), newSQLCalculator(mainCfg)}, mode: group.GetMode(), lowRUNotifyChan: lowRUNotifyChan, + burstable: &atomic.Bool{}, } switch gc.mode { @@ -431,7 +436,7 @@ func (gc *groupCostController) initRunState() { gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter) for typ := range requestUnitList { counter := &tokenCounter{ - limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan), + limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan), avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2, avgLastTime: now, } @@ -441,7 +446,7 @@ func (gc *groupCostController) initRunState() { gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter) for typ := range requestResourceList { counter := &tokenCounter{ - limiter: NewLimiter(now, 0, initialRequestUnits, gc.lowRUNotifyChan), + limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan), avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2, avgLastTime: now, } @@ -516,21 +521,31 @@ func (gc *groupCostController) handleTokenBucketTrickEvent(ctx context.Context) } func (gc *groupCostController) updateAvgRaWResourcePerSec() { + isBurstable := true for typ, counter := range gc.run.resourceTokens { + if counter.limiter.GetBurst() >= 0 { + isBurstable = false + } if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) { continue } log.Debug("[resource group controller] update avg raw resource per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) } + gc.burstable.Store(isBurstable) } func (gc *groupCostController) updateAvgRUPerSec() { + isBurstable := true for typ, counter := range gc.run.requestUnitTokens { + if counter.limiter.GetBurst() >= 0 { + isBurstable = false + } if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) { continue } log.Debug("[resource group controller] update avg ru per sec", zap.String("name", gc.Name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avgRUPerSec", counter.avgRUPerSec)) } + gc.burstable.Store(isBurstable) } func (gc *groupCostController) calcAvg(counter *tokenCounter, new float64) bool { @@ -623,6 +638,7 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket } var cfg tokenBucketReconfigureArgs + cfg.NewBurst = bucket.GetSettings().GetBurstLimit() // when trickleTimeMs equals zero, server has enough tokens and does not need to // limit client consume token. So all token is granted to client right now. if trickleTimeMs == 0 { @@ -723,6 +739,9 @@ func (gc *groupCostController) onRequestWait( calc.BeforeKVRequest(delta, info) } now := time.Now() + if gc.burstable.Load() { + goto ret + } // retry retryLoop: for i := 0; i < maxRetry; i++ { @@ -753,6 +772,7 @@ retryLoop: if err != nil { return err } +ret: gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() @@ -764,7 +784,9 @@ func (gc *groupCostController) onResponse(req RequestInfo, resp ResponseInfo) { for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } - + if gc.burstable.Load() { + goto ret + } switch gc.mode { case rmpb.GroupMode_RawMode: for typ, counter := range gc.run.resourceTokens { @@ -779,6 +801,7 @@ func (gc *groupCostController) onResponse(req RequestInfo, resp ResponseInfo) { } } } +ret: gc.mu.Lock() add(gc.mu.consumption, delta) gc.mu.Unlock() diff --git a/pkg/mcs/resource_manager/client/client_test.go b/pkg/mcs/resource_manager/client/client_test.go new file mode 100644 index 00000000000..f04fff05f3d --- /dev/null +++ b/pkg/mcs/resource_manager/client/client_test.go @@ -0,0 +1,54 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS,g +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package client + +import ( + "testing" + "time" + + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/stretchr/testify/require" +) + +func TestGroupControlBurstable(t *testing.T) { + re := require.New(t) + group := &rmpb.ResourceGroup{ + Name: "test", + Mode: rmpb.GroupMode_RUMode, + RUSettings: &rmpb.GroupRequestUnitSettings{ + RRU: &rmpb.TokenBucket{ + Settings: &rmpb.TokenLimitSettings{ + FillRate: 1000, + }, + }, + }, + } + ch := make(chan struct{}) + gc := newGroupCostController(group, DefaultConfig(), ch) + gc.initRunState() + args := tokenBucketReconfigureArgs{ + NewRate: 1000, + NewBurst: -1, + } + for _, counter := range gc.run.requestUnitTokens { + counter.limiter.Reconfigure(time.Now(), args) + } + gc.updateAvgRequestResourcePerSec() + re.Equal(gc.burstable.Load(), true) +} diff --git a/pkg/mcs/resource_manager/client/limiter.go b/pkg/mcs/resource_manager/client/limiter.go index 33eb46bbf0a..9c0940ad2db 100644 --- a/pkg/mcs/resource_manager/client/limiter.go +++ b/pkg/mcs/resource_manager/client/limiter.go @@ -60,10 +60,16 @@ func Every(interval time.Duration) Limit { // If no token is available, Reserve returns a reservation for a future token // and the amount of time the caller must wait before using it, // or its associated context.Context is canceled. +// +// Some changes about burst(b): +// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst within a capacity). +// - If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within a unlimited capacity). +// - If b > 0, that means the limiter is limited capacity. (current not used). type Limiter struct { mu sync.Mutex limit Limit tokens float64 + burst int64 // last is the last time the limiter's tokens field was updated last time.Time notifyThreshold float64 @@ -83,11 +89,12 @@ func (lim *Limiter) Limit() Limit { // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. -func NewLimiter(now time.Time, r Limit, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter { +func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotifyChan chan struct{}) *Limiter { lim := &Limiter{ limit: r, last: now, tokens: tokens, + burst: b, lowTokensNotifyChan: lowTokensNotifyChan, } log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim))) @@ -234,6 +241,13 @@ func (lim *Limiter) IsLowTokens() bool { return lim.isLowTokensLocked() } +// GetBurst returns the burst size of the limiter +func (lim *Limiter) GetBurst() int64 { + lim.mu.Lock() + defer lim.mu.Unlock() + return lim.burst +} + // RemoveTokens decreases the amount of tokens currently available. func (lim *Limiter) RemoveTokens(now time.Time, amount float64) { lim.mu.Lock() @@ -249,6 +263,8 @@ type tokenBucketReconfigureArgs struct { NewRate float64 + NewBurst int64 + NotifyThreshold float64 } @@ -261,6 +277,7 @@ func (lim *Limiter) Reconfigure(now time.Time, args tokenBucketReconfigureArgs) lim.last = now lim.tokens = tokens + args.NewTokens lim.limit = Limit(args.NewRate) + lim.burst = args.NewBurst lim.notifyThreshold = args.NotifyThreshold lim.isLowProcess = false lim.maybeNotify() @@ -282,7 +299,7 @@ func (lim *Limiter) reserveN(now time.Time, n float64, maxFutureReserve time.Dur lim.mu.Lock() defer lim.mu.Unlock() - if lim.limit == Inf { + if lim.limit == Inf || lim.burst < 0 { return Reservation{ ok: true, lim: lim, diff --git a/pkg/mcs/resource_manager/client/limiter_test.go b/pkg/mcs/resource_manager/client/limiter_test.go index b308514fa6c..f3f53206776 100644 --- a/pkg/mcs/resource_manager/client/limiter_test.go +++ b/pkg/mcs/resource_manager/client/limiter_test.go @@ -1,4 +1,4 @@ -// Copyright 2015 The Go Authors. All rights reserved. +// Copyright 2023 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. @@ -83,7 +83,7 @@ func checkTokens(re *require.Assertions, lim *Limiter, t time.Time, expected flo } func TestSimpleReserve(t *testing.T) { - lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) runReserveMax(t, lim, request{t0, 3, t1, true}) runReserveMax(t, lim, request{t0, 3, t4, true}) @@ -93,25 +93,33 @@ func TestSimpleReserve(t *testing.T) { runReserve(t, lim, request{t5, 2000, t6, false}, time.Second*100) runReserve(t, lim, request{t3, 2, t8, true}, time.Second*8) + // unlimited + args := tokenBucketReconfigureArgs{ + NewBurst: -1, + } + lim.Reconfigure(t1, args) + runReserveMax(t, lim, request{t5, 2000, t5, true}) } func TestReconfig(t *testing.T) { re := require.New(t) - lim := NewLimiter(t0, 1, 2, make(chan struct{}, 1)) + lim := NewLimiter(t0, 1, 0, 2, make(chan struct{}, 1)) runReserveMax(t, lim, request{t0, 4, t2, true}) args := tokenBucketReconfigureArgs{ NewTokens: 6., NewRate: 2, + NewBurst: -1, } lim.Reconfigure(t1, args) checkTokens(re, lim, t1, 5) checkTokens(re, lim, t2, 7) + re.Equal(int64(-1), lim.GetBurst()) } func TestNotify(t *testing.T) { nc := make(chan struct{}, 1) - lim := NewLimiter(t0, 1, 0, nc) + lim := NewLimiter(t0, 1, 0, 0, nc) args := tokenBucketReconfigureArgs{ NewTokens: 1000., @@ -132,8 +140,8 @@ func TestCancel(t *testing.T) { ctx1, cancel1 := context.WithDeadline(ctx, t2) re := require.New(t) nc := make(chan struct{}, 1) - lim1 := NewLimiter(t0, 1, 10, nc) - lim2 := NewLimiter(t0, 1, 0, nc) + lim1 := NewLimiter(t0, 1, 0, 10, nc) + lim2 := NewLimiter(t0, 1, 0, 0, nc) r1 := runReserveMax(t, lim1, request{t0, 5, t0, true}) checkTokens(re, lim1, t0, 5) diff --git a/pkg/mcs/resource_manager/server/token_bukets.go b/pkg/mcs/resource_manager/server/token_bukets.go index 9ca81ed5dae..17af3ce535b 100644 --- a/pkg/mcs/resource_manager/server/token_bukets.go +++ b/pkg/mcs/resource_manager/server/token_bukets.go @@ -103,7 +103,7 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe } var res rmpb.TokenBucket - res.Settings = &rmpb.TokenLimitSettings{} + res.Settings = &rmpb.TokenLimitSettings{BurstLimit: t.GetSettings().GetBurstLimit()} // FillRate is used for the token server unavailable in abnormal situation. if neededTokens <= 0 { return &res, 0 diff --git a/tests/msc/resource_manager/resource_manager_test.go b/tests/msc/resource_manager/resource_manager_test.go index 8b45125574f..1fcea64ead9 100644 --- a/tests/msc/resource_manager/resource_manager_test.go +++ b/tests/msc/resource_manager/resource_manager_test.go @@ -95,13 +95,15 @@ func (suite *resourceManagerClientTestSuite) SetupSuite() { RUSettings: &rmpb.GroupRequestUnitSettings{ RRU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ - FillRate: 40000, + FillRate: 40000, + BurstLimit: -1, }, Tokens: 100000, }, WRU: &rmpb.TokenBucket{ Settings: &rmpb.TokenLimitSettings{ - FillRate: 20000, + FillRate: 20000, + BurstLimit: -1, }, Tokens: 50000, }, @@ -382,6 +384,9 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { for _, resp := range aresp { re.Len(resp.GrantedRUTokens, 1) re.Equal(resp.GrantedRUTokens[0].GrantedTokens.Tokens, float64(100.)) + if resp.ResourceGroupName == "test2" { + re.Equal(int64(-1), resp.GrantedRUTokens[0].GrantedTokens.GetSettings().GetBurstLimit()) + } } gresp, err := cli.GetResourceGroup(suite.ctx, groups[0].GetName()) re.NoError(err) @@ -391,6 +396,7 @@ func (suite *resourceManagerClientTestSuite) TestAcquireTokenBucket() { re.Equal(g1.GetName(), g2.GetName()) re.Equal(g1.GetMode(), g2.GetMode()) re.Equal(g1.GetRUSettings().RRU.Settings.FillRate, g2.GetRUSettings().RRU.Settings.FillRate) + re.Equal(g1.GetRUSettings().RRU.Settings.BurstLimit, g2.GetRUSettings().RRU.Settings.BurstLimit) // now we don't persistent tokens in running state, so tokens is original. re.Equal(g1.GetRUSettings().RRU.Tokens, g2.GetRUSettings().RRU.Tokens) re.NoError(err)