Skip to content

Commit

Permalink
Optimize allocation behavior of the autoscaler (#10375)
Browse files Browse the repository at this point in the history
* Optimize allocation behavior of the autoscaler

This optimizes the autoscaler's allocation behavior by
1. Collapsing metric record calls, since that path is kind of expensive
2. Avoiding to build a new logger and instead only adding the metric key where relevant
3. Avoiding to build new loggers for the stable and panic debug logs
4. Avoiding to call formatting logs by checking the log levels first

* Add a simple autoscaler benchmark

* Use variable
  • Loading branch information
markusthoemmes authored Dec 15, 2020
1 parent e1e77b8 commit d2a5fde
Showing 1 changed file with 38 additions and 18 deletions.
56 changes: 38 additions & 18 deletions pkg/autoscaler/scaling/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package scaling
import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"knative.dev/pkg/logging"
pkgmetrics "knative.dev/pkg/metrics"
Expand Down Expand Up @@ -141,6 +143,8 @@ func (a *autoscaler) Update(deciderSpec *DeciderSpec) {
// regards to acquiring the decider spec.
func (a *autoscaler) Scale(ctx context.Context, now time.Time) ScaleResult {
logger := logging.FromContext(ctx)
desugared := logger.Desugar()
debugEnabled := desugared.Core().Enabled(zapcore.DebugLevel)

spec := a.currentSpec()
originalReadyPodsCount, err := a.podCounter.ReadyCount()
Expand All @@ -159,18 +163,11 @@ func (a *autoscaler) Scale(ctx context.Context, now time.Time) ScaleResult {
switch spec.ScalingMetric {
case autoscaling.RPS:
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicRPS(metricKey, now)
pkgmetrics.RecordBatch(a.reporterCtx, stableRPSM.M(observedStableValue), panicRPSM.M(observedStableValue),
targetRPSM.M(spec.TargetValue))
default:
metricName = autoscaling.Concurrency // concurrency is used by default
observedStableValue, observedPanicValue, err = a.metricClient.StableAndPanicConcurrency(metricKey, now)
pkgmetrics.RecordBatch(a.reporterCtx, stableRequestConcurrencyM.M(observedStableValue),
panicRequestConcurrencyM.M(observedPanicValue), targetRequestConcurrencyM.M(spec.TargetValue))
}

// Put the scaling metric to logs.
logger = logger.With(zap.String("metric", metricName))

if err != nil {
if errors.Is(err, metrics.ErrNoData) {
logger.Debug("No data to scale on yet")
Expand All @@ -194,17 +191,21 @@ func (a *autoscaler) Scale(ctx context.Context, now time.Time) ScaleResult {

dspc := math.Ceil(observedStableValue / spec.TargetValue)
dppc := math.Ceil(observedPanicValue / spec.TargetValue)
logger.Debugf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, ReadyEndpointCount = %d, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",
dspc, dppc, originalReadyPodsCount, maxScaleUp, maxScaleDown)
if debugEnabled {
desugared.Debug(fmt.Sprintf("DesiredStablePodCount = %0.3f, DesiredPanicPodCount = %0.3f, ReadyEndpointCount = %d, MaxScaleUp = %0.3f, MaxScaleDown = %0.3f",
dspc, dppc, originalReadyPodsCount, maxScaleUp, maxScaleDown), zap.String("metric", metricName))
}

// We want to keep desired pod count in the [maxScaleDown, maxScaleUp] range.
desiredStablePodCount := int32(math.Min(math.Max(dspc, maxScaleDown), maxScaleUp))
desiredPanicPodCount := int32(math.Min(math.Max(dppc, maxScaleDown), maxScaleUp))

logger.With(zap.String("mode", "stable")).Debugf("Observed average scaling metric value: %0.3f, targeting %0.3f.",
observedStableValue, spec.TargetValue)
logger.With(zap.String("mode", "panic")).Debugf("Observed average scaling metric value: %0.3f, targeting %0.3f.",
observedPanicValue, spec.TargetValue)
if debugEnabled {
desugared.Debug(fmt.Sprintf("Observed average scaling metric value: %0.3f, targeting %0.3f.",
observedStableValue, spec.TargetValue), zap.String("mode", "stable"), zap.String("metric", metricName))
desugared.Debug(fmt.Sprintf("Observed average scaling metric value: %0.3f, targeting %0.3f.",
observedPanicValue, spec.TargetValue), zap.String("mode", "panic"), zap.String("metric", metricName))
}

isOverPanicThreshold := dppc/readyPodsCount >= spec.PanicThreshold

Expand Down Expand Up @@ -290,12 +291,31 @@ func (a *autoscaler) Scale(ctx context.Context, now time.Time) ScaleResult {
numAct = int32(math.Max(MinActivators,
math.Ceil(float64(originalReadyPodsCount)*a.deciderSpec.TotalValue/a.deciderSpec.ActivatorCapacity)))
}
logger.Debugf("PodCount=%d Total1PodCapacity=%0.3f ObsStableValue=%0.3f ObsPanicValue=%0.3f TargetBC=%0.3f ExcessBC=%0.3f NumActivators=%d",
originalReadyPodsCount, a.deciderSpec.TotalValue, observedStableValue,
observedPanicValue, a.deciderSpec.TargetBurstCapacity, excessBCF, numAct)

pkgmetrics.RecordBatch(a.reporterCtx, excessBurstCapacityM.M(excessBCF),
desiredPodCountM.M(int64(desiredPodCount)))
if debugEnabled {
desugared.Debug(fmt.Sprintf("PodCount=%d Total1PodCapacity=%0.3f ObsStableValue=%0.3f ObsPanicValue=%0.3f TargetBC=%0.3f ExcessBC=%0.3f NumActivators=%d",
originalReadyPodsCount, a.deciderSpec.TotalValue, observedStableValue,
observedPanicValue, a.deciderSpec.TargetBurstCapacity, excessBCF, numAct), zap.String("metric", metricName))
}

switch spec.ScalingMetric {
case autoscaling.RPS:
pkgmetrics.RecordBatch(a.reporterCtx,
excessBurstCapacityM.M(excessBCF),
desiredPodCountM.M(int64(desiredPodCount)),
stableRPSM.M(observedStableValue),
panicRPSM.M(observedStableValue),
targetRPSM.M(spec.TargetValue),
)
default:
pkgmetrics.RecordBatch(a.reporterCtx,
excessBurstCapacityM.M(excessBCF),
desiredPodCountM.M(int64(desiredPodCount)),
stableRequestConcurrencyM.M(observedStableValue),
panicRequestConcurrencyM.M(observedPanicValue),
targetRequestConcurrencyM.M(spec.TargetValue),
)
}

return ScaleResult{
DesiredPodCount: desiredPodCount,
Expand Down

0 comments on commit d2a5fde

Please sign in to comment.