Skip to content

Commit

Permalink
resource_manager: make limit effective quickly and avoid jitter (#5958)
Browse files Browse the repository at this point in the history
close #5946, close #5954, close #5955

Signed-off-by: Cabinfever_B <cabinfeveroier@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
CabinfeverB and ti-chi-bot authored Feb 13, 2023
1 parent 1b1ed83 commit 7acfacb
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 23 deletions.
75 changes: 64 additions & 11 deletions client/resource_manager/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package client

import (
"context"
"math"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -133,15 +134,31 @@ func (c *ResourceGroupsController) Stop() error {
}

func (c *ResourceGroupsController) putResourceGroup(ctx context.Context, name string) (*groupCostController, error) {
// ref https://github.com/tikv/pd/issues/5955, if we don't introduce a lock, we need check the groupsController as much as possible to avoid
// getting or creating resource group. We check it in L139, L147 and L157.
if tmp, ok := c.groupsController.Load(name); ok {
gc := tmp.(*groupCostController)
return gc, nil
}
group, err := c.provider.GetResourceGroup(ctx, name)
if err != nil {
return nil, err
}
log.Info("create resource group cost controller", zap.String("name", group.GetName()))
gc := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.groupNotificationCh)
if tmp, ok := c.groupsController.Load(name); ok {
gc := tmp.(*groupCostController)
return gc, nil
}
gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.groupNotificationCh)
if err != nil {
return nil, err
}
// A future case: If user change mode from RU to RAW mode. How to re-init?
gc.initRunState()
c.groupsController.Store(group.GetName(), gc)
tmp, loaded := c.groupsController.LoadOrStore(group.GetName(), gc)
if !loaded {
log.Info("create resource group cost controller", zap.String("name", group.GetName()))
}
gc = tmp.(*groupCostController)
return gc, nil
}

Expand All @@ -152,8 +169,12 @@ func (c *ResourceGroupsController) updateAllResourceGroups(ctx context.Context)
}
latestGroups := make(map[string]struct{})
for _, group := range groups {
gc, err := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.groupNotificationCh)
if err != nil {
log.Warn("failed to create resource group cost controller", zap.Error(errors.Errorf("[resource group controller] %s is not existed or miss necessary configuration.", group.GetName())))
continue
}
log.Info("create resource group cost controller", zap.String("name", group.GetName()))
gc := newGroupCostController(group, c.config, c.lowTokenNotifyChan, c.groupNotificationCh)
c.groupsController.Store(group.GetName(), gc)
latestGroups[group.GetName()] = struct{}{}
}
Expand Down Expand Up @@ -318,7 +339,7 @@ func (c *ResourceGroupsController) OnRequestWait(
} else {
gc, err = c.putResourceGroup(ctx, resourceGroupName)
if err != nil {
return errors.Errorf("[resource group] resourceGroupName %s is not existed.", resourceGroupName)
return errors.Errorf("[resource group] %s is not existed or miss necessary configuration.", resourceGroupName)
}
}
err = gc.onRequestWait(ctx, info)
Expand Down Expand Up @@ -403,7 +424,16 @@ type tokenCounter struct {
limiter *Limiter
}

func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNotifyChan chan struct{}, groupNotificationCh chan *groupCostController) *groupCostController {
func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNotifyChan chan struct{}, groupNotificationCh chan *groupCostController) (*groupCostController, error) {
switch group.Mode {
case rmpb.GroupMode_RUMode:
if group.RUSettings.RU == nil || group.RUSettings.RU.Settings == nil {
return nil, errors.Errorf("the resource group is not configured")
}
default:
return nil, errors.New("not supports the resource type")
}

gc := &groupCostController{
ResourceGroup: group,
mainCfg: mainCfg,
Expand All @@ -422,7 +452,7 @@ func newGroupCostController(group *rmpb.ResourceGroup, mainCfg *Config, lowRUNot
}

gc.mu.consumption = &rmpb.Consumption{}
return gc
return gc, nil
}

func (gc *groupCostController) initRunState() {
Expand All @@ -434,23 +464,42 @@ func (gc *groupCostController) initRunState() {

gc.run.lastRequestConsumption = &rmpb.Consumption{}

cfgFunc := func(tb *rmpb.TokenBucket) tokenBucketReconfigureArgs {
cfg := tokenBucketReconfigureArgs{
NewTokens: initialRequestUnits,
NewBurst: tb.Settings.BurstLimit,
// This is to trigger token requests as soon as resource group start consuming tokens.
NotifyThreshold: math.Max(initialRequestUnits-float64(tb.Settings.FillRate)*0.2, 1),
}
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
}
return cfg
}

switch gc.mode {
case rmpb.GroupMode_RUMode:
gc.run.requestUnitTokens = make(map[rmpb.RequestUnitType]*tokenCounter)
for typ := range requestUnitLimitTypeList {
tb := getRUTokenBucketSetting(gc.ResourceGroup, typ)
cfg := cfgFunc(tb)
limiter := NewLimiterWithCfg(now, cfg, gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
}
gc.run.requestUnitTokens[typ] = counter
}
case rmpb.GroupMode_RawMode:
gc.run.resourceTokens = make(map[rmpb.RawResourceType]*tokenCounter)
for typ := range requestResourceLimitTypeList {
tb := getRawResourceTokenBucketSetting(gc.ResourceGroup, typ)
cfg := cfgFunc(tb)
limiter := NewLimiterWithCfg(now, cfg, gc.lowRUNotifyChan)
counter := &tokenCounter{
limiter: NewLimiter(now, 0, 0, initialRequestUnits, gc.lowRUNotifyChan),
avgRUPerSec: initialRequestUnits / gc.run.targetPeriod.Seconds() * 2,
limiter: limiter,
avgRUPerSec: 0,
avgLastTime: now,
}
gc.run.resourceTokens[typ] = counter
Expand Down Expand Up @@ -670,6 +719,10 @@ func (gc *groupCostController) modifyTokenCounter(counter *tokenCounter, bucket
cfg.NewRate = float64(bucket.GetSettings().FillRate)
cfg.NotifyThreshold = notifyThreshold
counter.lastDeadline = time.Time{}
// In the non-trickle case, clients can be allowed to accumulate more tokens.
if cfg.NewBurst >= 0 {
cfg.NewBurst = 0
}
} else {
// Otherwise the granted token is delivered to the client by fill rate.
cfg.NewTokens = 0
Expand Down
3 changes: 2 additions & 1 deletion client/resource_manager/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func TestGroupControlBurstable(t *testing.T) {
}
ch1 := make(chan struct{})
ch2 := make(chan *groupCostController)
gc := newGroupCostController(group, DefaultConfig(), ch1, ch2)
gc, err := newGroupCostController(group, DefaultConfig(), ch1, ch2)
re.NoError(err)
gc.initRunState()
args := tokenBucketReconfigureArgs{
NewRate: 1000,
Expand Down
26 changes: 23 additions & 3 deletions client/resource_manager/client/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func Every(interval time.Duration) Limit {
// or its associated context.Context is canceled.
//
// Some changes about burst(b):
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst within a capacity).
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and r is ignored, can be seen as r == Inf (burst within a unlimited capacity).
// - If b > 0, that means the limiter is limited capacity. (current not used).
// - If b > 0, that means the limiter is limited capacity.
type Limiter struct {
mu sync.Mutex
limit Limit
Expand Down Expand Up @@ -97,7 +97,22 @@ func NewLimiter(now time.Time, r Limit, b int64, tokens float64, lowTokensNotify
burst: b,
lowTokensNotifyChan: lowTokensNotifyChan,
}
log.Info("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
return lim
}

// NewLimiterWithCfg returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiterWithCfg(now time.Time, cfg tokenBucketReconfigureArgs, lowTokensNotifyChan chan struct{}) *Limiter {
lim := &Limiter{
limit: Limit(cfg.NewRate),
last: now,
tokens: cfg.NewTokens,
burst: cfg.NewBurst,
notifyThreshold: cfg.NotifyThreshold,
lowTokensNotifyChan: lowTokensNotifyChan,
}
log.Debug("new limiter", zap.String("limiter", fmt.Sprintf("%+v", lim)))
return lim
}

Expand Down Expand Up @@ -354,6 +369,11 @@ func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time,
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if lim.burst != 0 {
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
}
return now, last, tokens
}

Expand Down
19 changes: 19 additions & 0 deletions client/resource_manager/client/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,13 @@ func getRUValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RequestUnitTyp
return 0
}

func getRUTokenBucketSetting(group *rmpb.ResourceGroup, typ rmpb.RequestUnitType) *rmpb.TokenBucket {
if typ == 0 {
return group.RUSettings.RU
}
return nil
}

func getRawResourceValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RawResourceType) float64 {
switch typ {
case 0:
Expand All @@ -165,6 +172,18 @@ func getRawResourceValueFromConsumption(custom *rmpb.Consumption, typ rmpb.RawRe
return 0
}

func getRawResourceTokenBucketSetting(group *rmpb.ResourceGroup, typ rmpb.RawResourceType) *rmpb.TokenBucket {
switch typ {
case 0:
return group.RawResourceSettings.Cpu
case 1:
return group.RawResourceSettings.IoRead
case 2:
return group.RawResourceSettings.IoWrite
}
return nil
}

func add(custom1 *rmpb.Consumption, custom2 *rmpb.Consumption) {
custom1.RRU += custom2.RRU
custom1.WRU += custom2.WRU
Expand Down
30 changes: 22 additions & 8 deletions pkg/mcs/resource_manager/server/token_bukets.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ const (
type GroupTokenBucket struct {
// Settings is the setting of TokenBucket.
// BurstLimit is used as below:
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within a unlimited capacity).
// - If b == 0, that means the limiter is unlimited capacity. default use in resource controller (burst with a rate within an unlimited capacity).
// - If b < 0, that means the limiter is unlimited capacity and fillrate(r) is ignored, can be seen as r == Inf (burst within a unlimited capacity).
// - If b > 0, that means the limiter is limited capacity. (current not used).
// - If b > 0, that means the limiter is limited capacity.
// MaxTokens limits the number of tokens that can be accumulated
Settings *rmpb.TokenLimitSettings `json:"settings,omitempty"`
GroupTokenBucketState `json:"state,omitempty"`
Expand All @@ -50,6 +50,8 @@ type GroupTokenBucketState struct {
Tokens float64 `json:"tokens,omitempty"`
LastUpdate *time.Time `json:"last_update,omitempty"`
Initialized bool `json:"initialized"`
// settingChanged is used to avoid that the number of tokens returned is jitter because of changing fill rate.
settingChanged bool
}

// Clone returns the copy of GroupTokenBucketState
Expand Down Expand Up @@ -92,6 +94,7 @@ func (t *GroupTokenBucket) patch(tb *rmpb.TokenBucket) {
}
if setting := proto.Clone(tb.GetSettings()).(*rmpb.TokenLimitSettings); setting != nil {
t.Settings = setting
t.settingChanged = true
}

// the settings in token is delta of the last update and now.
Expand Down Expand Up @@ -121,6 +124,11 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
t.LastUpdate = &now
}
}
// reloan when setting changed
if t.settingChanged && t.Tokens <= 0 {
t.Tokens = 0
}
t.settingChanged = false
if t.Settings.BurstLimit != 0 {
if burst := float64(t.Settings.BurstLimit); t.Tokens > burst {
t.Tokens = burst
Expand Down Expand Up @@ -159,6 +167,12 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
var targetPeriodTime = time.Duration(targetPeriodMs) * time.Millisecond
var trickleTime = 0.

LoanCoefficient := defaultLoanCoefficient
// when BurstLimit less or equal FillRate, the server does not accumulate a significant number of tokens.
// So we don't need to smooth the token allocation speed.
if t.Settings.BurstLimit > 0 && t.Settings.BurstLimit <= int64(t.Settings.FillRate) {
LoanCoefficient = 1
}
// When there are loan, the allotment will match the fill rate.
// We will have k threshold, beyond which the token allocation will be a minimum.
// The threshold unit is `fill rate * target period`.
Expand All @@ -173,18 +187,18 @@ func (t *GroupTokenBucket) request(now time.Time, neededTokens float64, targetPe
// |
// grant_rate 0 ------------------------------------------------------------------------------------
// loan *** k*period_token (k+k-1)*period_token *** (k+k+1...+1)*period_token
p := make([]float64, defaultLoanCoefficient)
p[0] = float64(defaultLoanCoefficient) * float64(t.Settings.FillRate) * targetPeriodTime.Seconds()
for i := 1; i < defaultLoanCoefficient; i++ {
p[i] = float64(defaultLoanCoefficient-i)*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() + p[i-1]
p := make([]float64, LoanCoefficient)
p[0] = float64(LoanCoefficient) * float64(t.Settings.FillRate) * targetPeriodTime.Seconds()
for i := 1; i < LoanCoefficient; i++ {
p[i] = float64(LoanCoefficient-i)*float64(t.Settings.FillRate)*targetPeriodTime.Seconds() + p[i-1]
}
for i := 0; i < defaultLoanCoefficient && neededTokens > 0 && trickleTime < targetPeriodTime.Seconds(); i++ {
for i := 0; i < LoanCoefficient && neededTokens > 0 && trickleTime < targetPeriodTime.Seconds(); i++ {
loan := -t.Tokens
if loan > p[i] {
continue
}
roundReserveTokens := p[i] - loan
fillRate := float64(defaultLoanCoefficient-i) * float64(t.Settings.FillRate)
fillRate := float64(LoanCoefficient-i) * float64(t.Settings.FillRate)
if roundReserveTokens > neededTokens {
t.Tokens -= neededTokens
grantedTokens += neededTokens
Expand Down

0 comments on commit 7acfacb

Please sign in to comment.