diff --git a/client/resource_group/controller/controller.go b/client/resource_group/controller/controller.go index 7efef3c2b964..5fac13306639 100644 --- a/client/resource_group/controller/controller.go +++ b/client/resource_group/controller/controller.go @@ -961,7 +961,7 @@ func (gc *groupCostController) updateRunState() { } *gc.run.consumption = *gc.mu.consumption gc.mu.Unlock() - logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption)) + logControllerTrace("[resource group controller] update run state", zap.String("name", gc.name), zap.Any("request-unit-consumption", gc.run.consumption), zap.Bool("is-throttled", gc.isThrottled.Load())) gc.run.now = newTime } @@ -1042,7 +1042,7 @@ func (gc *groupCostController) updateAvgRaWResourcePerSec() { if !gc.calcAvg(counter, getRawResourceValueFromConsumption(gc.run.consumption, typ)) { continue } - logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) + logControllerTrace("[resource group controller] update avg raw resource per sec", zap.String("name", gc.name), zap.String("type", rmpb.RawResourceType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load())) } gc.burstable.Store(isBurstable) } @@ -1056,7 +1056,7 @@ func (gc *groupCostController) updateAvgRUPerSec() { if !gc.calcAvg(counter, getRUValueFromConsumption(gc.run.consumption, typ)) { continue } - logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec)) + logControllerTrace("[resource group controller] update avg ru per sec", zap.String("name", gc.name), zap.String("type", rmpb.RequestUnitType_name[int32(typ)]), zap.Float64("avg-ru-per-sec", counter.avgRUPerSec), zap.Bool("is-throttled", gc.isThrottled.Load())) } gc.burstable.Store(isBurstable) } @@ -1343,6 +1343,54 @@ func (gc *groupCostController) calcRequest(counter *tokenCounter) float64 { return value } +func (gc *groupCostController) acquireTokens(ctx context.Context, delta *rmpb.Consumption, waitDuration *time.Duration, allowDebt bool) (time.Duration, error) { + gc.metrics.runningKVRequestCounter.Inc() + defer gc.metrics.runningKVRequestCounter.Dec() + var ( + err error + d time.Duration + ) +retryLoop: + for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ { + now := time.Now() + switch gc.mode { + case rmpb.GroupMode_RawMode: + res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) + for typ, counter := range gc.run.resourceTokens { + if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) + } + } + if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { + break retryLoop + } + case rmpb.GroupMode_RUMode: + res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) + for typ, counter := range gc.run.requestUnitTokens { + if v := getRUValueFromConsumption(delta, typ); v > 0 { + // record the consume token histogram if enable controller debug mode. + if enableControllerTraceLog.Load() { + gc.metrics.consumeTokenHistogram.Observe(v) + } + // allow debt for small request or not in throttled. remove tokens directly. + if allowDebt { + counter.limiter.RemoveTokens(now, v) + break retryLoop + } + res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) + } + } + if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { + break retryLoop + } + } + gc.metrics.requestRetryCounter.Inc() + time.Sleep(gc.mainCfg.WaitRetryInterval) + *waitDuration += gc.mainCfg.WaitRetryInterval + } + return d, err +} + func (gc *groupCostController) onRequestWaitImpl( ctx context.Context, info RequestInfo, ) (*rmpb.Consumption, *rmpb.Consumption, time.Duration, uint32, error) { @@ -1357,44 +1405,8 @@ func (gc *groupCostController) onRequestWaitImpl( var waitDuration time.Duration if !gc.burstable.Load() { - var err error - var i int - var d time.Duration - gc.metrics.runningKVRequestCounter.Inc() - defer gc.metrics.runningKVRequestCounter.Dec() - retryLoop: - for i = 0; i < gc.mainCfg.WaitRetryTimes; i++ { - now := time.Now() - switch gc.mode { - case rmpb.GroupMode_RawMode: - res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) - for typ, counter := range gc.run.resourceTokens { - if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) - } - } - if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { - break retryLoop - } - case rmpb.GroupMode_RUMode: - res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) - for typ, counter := range gc.run.requestUnitTokens { - if v := getRUValueFromConsumption(delta, typ); v > 0 { - // record the consume token histogram if enable controller debug mode. - if enableControllerTraceLog.Load() { - gc.metrics.consumeTokenHistogram.Observe(v) - } - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) - } - } - if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { - break retryLoop - } - } - gc.metrics.requestRetryCounter.Inc() - time.Sleep(gc.mainCfg.WaitRetryInterval) - waitDuration += gc.mainCfg.WaitRetryInterval - } + allowDebt := delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() + d, err := gc.acquireTokens(ctx, delta, &waitDuration, allowDebt) if err != nil { if errs.ErrClientResourceGroupThrottled.Equal(err) { gc.metrics.failedRequestCounterWithThrottled.Inc() @@ -1479,57 +1491,9 @@ func (gc *groupCostController) onResponseWaitImpl( for _, calc := range gc.calculators { calc.AfterKVRequest(delta, req, resp) } - var ( - waitDuration time.Duration - d time.Duration - err error - ) + var waitDuration time.Duration if !gc.burstable.Load() { - gc.metrics.runningKVRequestCounter.Inc() - defer gc.metrics.runningKVRequestCounter.Dec() - retryLoop: - for i := 0; i < gc.mainCfg.WaitRetryTimes; i++ { - now := time.Now() - switch gc.mode { - case rmpb.GroupMode_RawMode: - res := make([]*Reservation, 0, len(requestResourceLimitTypeList)) - for typ, counter := range gc.run.resourceTokens { - if v := getRawResourceValueFromConsumption(delta, typ); v > 0 { - // allow debt for small request or not in throttled. - if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() { - counter.limiter.RemoveTokens(time.Now(), v) - break retryLoop - } - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) - } - } - if d, err = WaitReservations(ctx, now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { - break retryLoop - } - case rmpb.GroupMode_RUMode: - res := make([]*Reservation, 0, len(requestUnitLimitTypeList)) - for typ, counter := range gc.run.requestUnitTokens { - if v := getRUValueFromConsumption(delta, typ); v > 0 { - // record the consume token histogram if enable controller debug mode. - if enableControllerTraceLog.Load() { - gc.metrics.consumeTokenHistogram.Observe(v) - } - // allow debt for small request or not in throttled. - if delta.ReadBytes+delta.WriteBytes < bigRequestThreshold || !gc.isThrottled.Load() { - counter.limiter.RemoveTokens(time.Now(), v) - break retryLoop - } - res = append(res, counter.limiter.Reserve(ctx, gc.mainCfg.LTBMaxWaitDuration, now, v)) - } - } - if d, err = WaitReservations(context.Background(), now, res); err == nil || errs.ErrClientResourceGroupThrottled.NotEqual(err) { - break retryLoop - } - } - gc.metrics.requestRetryCounter.Inc() - time.Sleep(gc.mainCfg.WaitRetryInterval) - waitDuration += gc.mainCfg.WaitRetryInterval - } + d, err := gc.acquireTokens(ctx, delta, &waitDuration, false) if err != nil { if errs.ErrClientResourceGroupThrottled.Equal(err) { gc.metrics.failedRequestCounterWithThrottled.Inc() diff --git a/client/resource_group/controller/limiter.go b/client/resource_group/controller/limiter.go index 48cdb082e77f..5d9823312ca8 100644 --- a/client/resource_group/controller/limiter.go +++ b/client/resource_group/controller/limiter.go @@ -335,7 +335,7 @@ func (lim *Limiter) Reconfigure(now time.Time, ) { lim.mu.Lock() defer lim.mu.Unlock() - logControllerTrace("[resource group controller] before reconfigure", zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", lim.notifyThreshold), zap.Int64("old-burst", lim.burst)) + logControllerTrace("[resource group controller] before reconfigure", zap.String("name", lim.name), zap.Float64("old-tokens", lim.tokens), zap.Float64("old-rate", float64(lim.limit)), zap.Float64("old-notify-threshold", lim.notifyThreshold), zap.Int64("old-burst", lim.burst)) if args.NewBurst < 0 { lim.last = now lim.tokens = args.NewTokens