From c9f6a0c28f77e98045dc50540ce079f1d367d452 Mon Sep 17 00:00:00 2001 From: Ryan Cumming Date: Sun, 3 Mar 2019 10:51:55 +1100 Subject: [PATCH] Add scale-in cooldown support This reimplements ASG scale-in cooldown inside the lambda. It takes two parameters which correspond to the existing ASG parameters in the elastic CI stack. 1. `SCALE_IN_COOLDOWN_PERIOD` is the cooldown time between scale in events. This defaults to the existing 5 minutes. 2. `SCALE_IN_ADJUSTMENT` is the maximum adjustment during scale-in events. Unlike the ASG we may scale in less if we calculate that the desired is closer than the adjustment. This defaults to the existing -1. This cheats a bit by storing `lastScaleInTime` in a global variable. This means we'll forget about our cooldown during a cold start. This should happen fairly infrequently and just make us a bit aggressive about scaling it; it shouldn't affect correctness. --- lambda/main.go | 31 ++++++++++++++++++ main.go | 10 ++++++ scaler/scaler.go | 23 ++++++++++++++ scaler/scaler_test.go | 74 +++++++++++++++++++++++++++++++++++++++++-- template.yaml | 23 +++++++++++--- 5 files changed, 154 insertions(+), 7 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index f381962..57d27c0 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -17,6 +17,10 @@ import ( var invokeCount = 0 +// lastScaleDownTime stores the last time we scaled down the ASG +// On a cold start this will be reset to Jan 1st, 1970 +var lastScaleInTime time.Time + func main() { if os.Getenv(`DEBUG`) != "" { _, err := Handler(context.Background(), json.RawMessage([]byte{})) @@ -35,6 +39,8 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { var timeout <-chan time.Time = make(chan time.Time) var interval time.Duration = 10 * time.Second + var scaleInCooldownPeriod time.Duration = 5 * time.Minute + var scaleInAdjustment int64 = -1 if intervalStr := os.Getenv(`LAMBDA_INTERVAL`); intervalStr != "" { var err error @@ -52,6 +58,26 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { timeout = time.After(timeoutDuration) } + if scaleInCooldownPeriodStr := os.Getenv(`SCALE_IN_COOLDOWN_PERIOD`); scaleInCooldownPeriodStr != "" { + var err error + scaleInCooldownPeriod, err = time.ParseDuration(scaleInCooldownPeriodStr) + if err != nil { + return "", err + } + } + + if scaleInAdjustmentStr := os.Getenv(`SCALE_IN_ADJUSTMENT`); scaleInAdjustmentStr != "" { + var err error + scaleInAdjustment, err = strconv.ParseInt(scaleInAdjustmentStr, 10, 64) + if err != nil { + return "", err + } + + if scaleInAdjustment >= 0 { + panic(fmt.Sprintf("Scale in adjustment (%d) must be negative", scaleInAdjustment)) + } + } + var mustGetEnv = func(env string) string { val := os.Getenv(env) if val == "" { @@ -80,6 +106,11 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { BuildkiteQueue: mustGetEnv(`BUILDKITE_QUEUE`), AutoScalingGroupName: mustGetEnv(`ASG_NAME`), AgentsPerInstance: mustGetEnvInt(`AGENTS_PER_INSTANCE`), + ScaleInParams: scaler.ScaleInParams{ + CooldownPeriod: scaleInCooldownPeriod, + Adjustment: scaleInAdjustment, + LastScaleInTime: &lastScaleInTime, + }, } if m := os.Getenv(`CLOUDWATCH_METRICS`); m == `true` || m == `1` { diff --git a/main.go b/main.go index 0a1236f..eb6985c 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "log" + "time" "github.com/buildkite/buildkite-agent-scaler/buildkite" "github.com/buildkite/buildkite-agent-scaler/scaler" @@ -19,6 +20,9 @@ func main() { buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics") buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token") + // scale in params + scaleInAdjustment = flag.Int64("scale-in-adjustment", -1, "Maximum adjustment to the desired capacity on scale in") + // general params dryRun = flag.Bool("dry-run", false, "Whether to just show what would be done") ) @@ -32,6 +36,12 @@ func main() { AgentsPerInstance: *agentsPerInstance, PublishCloudWatchMetrics: *cwMetrics, DryRun: *dryRun, + ScaleInParams: scaler.ScaleInParams{ + // We run in one-shot so cooldown isn't implemented + CooldownPeriod: time.Duration(0), + Adjustment: *scaleInAdjustment, + LastScaleInTime: &time.Time{}, + }, }) if err != nil { log.Fatal(err) diff --git a/scaler/scaler.go b/scaler/scaler.go index e2d82a4..4ef9f98 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -8,6 +8,12 @@ import ( "github.com/buildkite/buildkite-agent-scaler/buildkite" ) +type ScaleInParams struct { + CooldownPeriod time.Duration + Adjustment int64 + LastScaleInTime *time.Time +} + type Params struct { AutoScalingGroupName string AgentsPerInstance int @@ -16,6 +22,8 @@ type Params struct { UserAgent string PublishCloudWatchMetrics bool DryRun bool + + ScaleInParams ScaleInParams } type Scaler struct { @@ -30,6 +38,7 @@ type Scaler struct { Publish(metrics map[string]int64) error } agentsPerInstance int + scaleInParams ScaleInParams } func NewScaler(bk *buildkite.Client, params Params) (*Scaler, error) { @@ -39,6 +48,7 @@ func NewScaler(bk *buildkite.Client, params Params) (*Scaler, error) { queue: params.BuildkiteQueue, }, agentsPerInstance: params.AgentsPerInstance, + scaleInParams: params.ScaleInParams, } orgSlug, err := bk.GetOrgSlug() @@ -114,6 +124,18 @@ func (s *Scaler) Run() error { log.Printf("↳ Set desired to %d (took %v)", desired, time.Now().Sub(t)) } else if current.DesiredCount > desired { + cooldownRemaining := s.scaleInParams.CooldownPeriod - time.Since(*s.scaleInParams.LastScaleInTime) + + if cooldownRemaining > 0 { + log.Printf("⏲ Want to scale IN but in cooldown for %d seconds", cooldownRemaining/time.Second) + return nil + } + + minimumDesired := current.DesiredCount + s.scaleInParams.Adjustment + if desired < minimumDesired { + desired = minimumDesired + } + log.Printf("Scaling IN 📉 to %d instances (currently %d)", desired, current.DesiredCount) err = s.autoscaling.SetDesiredCapacity(desired) @@ -121,6 +143,7 @@ func (s *Scaler) Run() error { return err } + *s.scaleInParams.LastScaleInTime = time.Now() log.Printf("↳ Set desired to %d (took %v)", desired, time.Now().Sub(t)) } else { log.Printf("No scaling required, currently %d", current.DesiredCount) diff --git a/scaler/scaler_test.go b/scaler/scaler_test.go index dbf011f..6688378 100644 --- a/scaler/scaler_test.go +++ b/scaler/scaler_test.go @@ -2,9 +2,10 @@ package scaler import ( "testing" + "time" ) -func TestScalingWithoutError(t *testing.T) { +func TestScalingOutWithoutError(t *testing.T) { for _, tc := range []struct { ScheduledJobs int64 AgentsPerInstance int @@ -37,6 +38,71 @@ func TestScalingWithoutError(t *testing.T) { } } +func TestScalingInWithoutError(t *testing.T) { + testCases := []struct { + currentDesiredCapacity int64 + coolDownPeriod time.Duration + lastScaleInTime time.Time + adjustment int64 + + expectedDesiredCapacity int64 + }{ + { + currentDesiredCapacity: 10, + coolDownPeriod: 5 * time.Minute, + lastScaleInTime: time.Now(), + adjustment: -1, + + // We're inside cooldown + expectedDesiredCapacity: 10, + }, + { + currentDesiredCapacity: 10, + coolDownPeriod: 5 * time.Minute, + lastScaleInTime: time.Now().Add(-10 * time.Minute), + adjustment: -2, + + // We're out of cooldown but we can only adjust by -2 + expectedDesiredCapacity: 8, + }, + { + currentDesiredCapacity: 10, + coolDownPeriod: 5 * time.Minute, + lastScaleInTime: time.Now().Add(-10 * time.Minute), + adjustment: -100, + + // We're allowed to adjust the whole amount + expectedDesiredCapacity: 0, + }, + } + + for _, tc := range testCases { + t.Run("", func(t *testing.T) { + asg := &asgTestDriver{desiredCapacity: tc.currentDesiredCapacity} + s := Scaler{ + autoscaling: asg, + bk: &buildkiteTestDriver{count: 0}, + agentsPerInstance: 1, + scaleInParams: ScaleInParams{ + CooldownPeriod: tc.coolDownPeriod, + Adjustment: tc.adjustment, + LastScaleInTime: &tc.lastScaleInTime, + }, + } + + if err := s.Run(); err != nil { + t.Fatal(err) + } + + if asg.desiredCapacity != tc.expectedDesiredCapacity { + t.Fatalf("Expected desired capacity of %d, got %d", + tc.expectedDesiredCapacity, asg.desiredCapacity, + ) + } + }) + } +} + type buildkiteTestDriver struct { count int64 err error @@ -52,7 +118,11 @@ type asgTestDriver struct { } func (d *asgTestDriver) Describe() (AutoscaleGroupDetails, error) { - return AutoscaleGroupDetails{MinSize: 0, MaxSize: 100}, nil + return AutoscaleGroupDetails{ + DesiredCount: d.desiredCapacity, + MinSize: 0, + MaxSize: 100, + }, nil } func (d *asgTestDriver) SetDesiredCapacity(count int64) error { diff --git a/template.yaml b/template.yaml index 488072e..05a1ad2 100644 --- a/template.yaml +++ b/template.yaml @@ -19,6 +19,17 @@ Parameters: Type: String Default: default + ScaleInCooldownPeriod: + Description: Cooldown period between scale in events + Type: String + Default: 5m + + ScaleInAdjustment: + Description: Maximum adjustment to the desired capacity on scale in + Type: Number + MaxValue: -1 + Default: -1 + Mappings: LambdaBucket: us-east-1 : { Bucket: "buildkite-lambdas" } @@ -97,11 +108,13 @@ Resources: MemorySize: 128 Environment: Variables: - BUILDKITE_AGENT_TOKEN: !Ref BuildkiteAgentToken - BUILDKITE_QUEUE: !Ref BuildkiteQueue - ASG_NAME: !Ref AgentAutoScaleGroup - LAMBDA_TIMEOUT: 1m - LAMBDA_INTERVAL: 20s + BUILDKITE_AGENT_TOKEN: !Ref BuildkiteAgentToken + BUILDKITE_QUEUE: !Ref BuildkiteQueue + ASG_NAME: !Ref AgentAutoScaleGroup + SCALE_IN_COOLDOWN_PERIOD: !Ref ScaleInCooldownPeriod + SCALE_IN_ADJUSTMENT: !Ref ScaleInAdjustment + LAMBDA_TIMEOUT: 1m + LAMBDA_INTERVAL: 20s AutoscalingLambdaScheduledRule: Type: "AWS::Events::Rule"