Skip to content

Commit

Permalink
Merge pull request #6 from etaoins/add-scale-in-cooldown-support
Browse files Browse the repository at this point in the history
Add scale-in cooldown support
  • Loading branch information
lox authored Mar 25, 2019
2 parents c2c4bdb + c9f6a0c commit e3e571a
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 7 deletions.
31 changes: 31 additions & 0 deletions lambda/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (

var invokeCount = 0

// lastScaleDownTime stores the last time we scaled down the ASG
// On a cold start this will be reset to Jan 1st, 1970
var lastScaleInTime time.Time

func main() {
if os.Getenv(`DEBUG`) != "" {
_, err := Handler(context.Background(), json.RawMessage([]byte{}))
Expand All @@ -35,6 +39,8 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {

var timeout <-chan time.Time = make(chan time.Time)
var interval time.Duration = 10 * time.Second
var scaleInCooldownPeriod time.Duration = 5 * time.Minute
var scaleInAdjustment int64 = -1

if intervalStr := os.Getenv(`LAMBDA_INTERVAL`); intervalStr != "" {
var err error
Expand All @@ -52,6 +58,26 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
timeout = time.After(timeoutDuration)
}

if scaleInCooldownPeriodStr := os.Getenv(`SCALE_IN_COOLDOWN_PERIOD`); scaleInCooldownPeriodStr != "" {
var err error
scaleInCooldownPeriod, err = time.ParseDuration(scaleInCooldownPeriodStr)
if err != nil {
return "", err
}
}

if scaleInAdjustmentStr := os.Getenv(`SCALE_IN_ADJUSTMENT`); scaleInAdjustmentStr != "" {
var err error
scaleInAdjustment, err = strconv.ParseInt(scaleInAdjustmentStr, 10, 64)
if err != nil {
return "", err
}

if scaleInAdjustment >= 0 {
panic(fmt.Sprintf("Scale in adjustment (%d) must be negative", scaleInAdjustment))
}
}

var mustGetEnv = func(env string) string {
val := os.Getenv(env)
if val == "" {
Expand Down Expand Up @@ -80,6 +106,11 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
BuildkiteQueue: mustGetEnv(`BUILDKITE_QUEUE`),
AutoScalingGroupName: mustGetEnv(`ASG_NAME`),
AgentsPerInstance: mustGetEnvInt(`AGENTS_PER_INSTANCE`),
ScaleInParams: scaler.ScaleInParams{
CooldownPeriod: scaleInCooldownPeriod,
Adjustment: scaleInAdjustment,
LastScaleInTime: &lastScaleInTime,
},
}

if m := os.Getenv(`CLOUDWATCH_METRICS`); m == `true` || m == `1` {
Expand Down
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"log"
"time"

"github.com/buildkite/buildkite-agent-scaler/buildkite"
"github.com/buildkite/buildkite-agent-scaler/scaler"
Expand All @@ -19,6 +20,9 @@ func main() {
buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics")
buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token")

// scale in params
scaleInAdjustment = flag.Int64("scale-in-adjustment", -1, "Maximum adjustment to the desired capacity on scale in")

// general params
dryRun = flag.Bool("dry-run", false, "Whether to just show what would be done")
)
Expand All @@ -32,6 +36,12 @@ func main() {
AgentsPerInstance: *agentsPerInstance,
PublishCloudWatchMetrics: *cwMetrics,
DryRun: *dryRun,
ScaleInParams: scaler.ScaleInParams{
// We run in one-shot so cooldown isn't implemented
CooldownPeriod: time.Duration(0),
Adjustment: *scaleInAdjustment,
LastScaleInTime: &time.Time{},
},
})
if err != nil {
log.Fatal(err)
Expand Down
23 changes: 23 additions & 0 deletions scaler/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"github.com/buildkite/buildkite-agent-scaler/buildkite"
)

type ScaleInParams struct {
CooldownPeriod time.Duration
Adjustment int64
LastScaleInTime *time.Time
}

type Params struct {
AutoScalingGroupName string
AgentsPerInstance int
Expand All @@ -16,6 +22,8 @@ type Params struct {
UserAgent string
PublishCloudWatchMetrics bool
DryRun bool

ScaleInParams ScaleInParams
}

type Scaler struct {
Expand All @@ -30,6 +38,7 @@ type Scaler struct {
Publish(metrics map[string]int64) error
}
agentsPerInstance int
scaleInParams ScaleInParams
}

func NewScaler(bk *buildkite.Client, params Params) (*Scaler, error) {
Expand All @@ -39,6 +48,7 @@ func NewScaler(bk *buildkite.Client, params Params) (*Scaler, error) {
queue: params.BuildkiteQueue,
},
agentsPerInstance: params.AgentsPerInstance,
scaleInParams: params.ScaleInParams,
}

orgSlug, err := bk.GetOrgSlug()
Expand Down Expand Up @@ -114,13 +124,26 @@ func (s *Scaler) Run() error {

log.Printf("↳ Set desired to %d (took %v)", desired, time.Now().Sub(t))
} else if current.DesiredCount > desired {
cooldownRemaining := s.scaleInParams.CooldownPeriod - time.Since(*s.scaleInParams.LastScaleInTime)

if cooldownRemaining > 0 {
log.Printf("⏲ Want to scale IN but in cooldown for %d seconds", cooldownRemaining/time.Second)
return nil
}

minimumDesired := current.DesiredCount + s.scaleInParams.Adjustment
if desired < minimumDesired {
desired = minimumDesired
}

log.Printf("Scaling IN 📉 to %d instances (currently %d)", desired, current.DesiredCount)

err = s.autoscaling.SetDesiredCapacity(desired)
if err != nil {
return err
}

*s.scaleInParams.LastScaleInTime = time.Now()
log.Printf("↳ Set desired to %d (took %v)", desired, time.Now().Sub(t))
} else {
log.Printf("No scaling required, currently %d", current.DesiredCount)
Expand Down
74 changes: 72 additions & 2 deletions scaler/scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package scaler

import (
"testing"
"time"
)

func TestScalingWithoutError(t *testing.T) {
func TestScalingOutWithoutError(t *testing.T) {
for _, tc := range []struct {
ScheduledJobs int64
AgentsPerInstance int
Expand Down Expand Up @@ -37,6 +38,71 @@ func TestScalingWithoutError(t *testing.T) {
}
}

func TestScalingInWithoutError(t *testing.T) {
testCases := []struct {
currentDesiredCapacity int64
coolDownPeriod time.Duration
lastScaleInTime time.Time
adjustment int64

expectedDesiredCapacity int64
}{
{
currentDesiredCapacity: 10,
coolDownPeriod: 5 * time.Minute,
lastScaleInTime: time.Now(),
adjustment: -1,

// We're inside cooldown
expectedDesiredCapacity: 10,
},
{
currentDesiredCapacity: 10,
coolDownPeriod: 5 * time.Minute,
lastScaleInTime: time.Now().Add(-10 * time.Minute),
adjustment: -2,

// We're out of cooldown but we can only adjust by -2
expectedDesiredCapacity: 8,
},
{
currentDesiredCapacity: 10,
coolDownPeriod: 5 * time.Minute,
lastScaleInTime: time.Now().Add(-10 * time.Minute),
adjustment: -100,

// We're allowed to adjust the whole amount
expectedDesiredCapacity: 0,
},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
asg := &asgTestDriver{desiredCapacity: tc.currentDesiredCapacity}
s := Scaler{
autoscaling: asg,
bk: &buildkiteTestDriver{count: 0},
agentsPerInstance: 1,
scaleInParams: ScaleInParams{
CooldownPeriod: tc.coolDownPeriod,
Adjustment: tc.adjustment,
LastScaleInTime: &tc.lastScaleInTime,
},
}

if err := s.Run(); err != nil {
t.Fatal(err)
}

if asg.desiredCapacity != tc.expectedDesiredCapacity {
t.Fatalf("Expected desired capacity of %d, got %d",
tc.expectedDesiredCapacity, asg.desiredCapacity,
)
}
})
}
}

type buildkiteTestDriver struct {
count int64
err error
Expand All @@ -52,7 +118,11 @@ type asgTestDriver struct {
}

func (d *asgTestDriver) Describe() (AutoscaleGroupDetails, error) {
return AutoscaleGroupDetails{MinSize: 0, MaxSize: 100}, nil
return AutoscaleGroupDetails{
DesiredCount: d.desiredCapacity,
MinSize: 0,
MaxSize: 100,
}, nil
}

func (d *asgTestDriver) SetDesiredCapacity(count int64) error {
Expand Down
23 changes: 18 additions & 5 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ Parameters:
Type: String
Default: default

ScaleInCooldownPeriod:
Description: Cooldown period between scale in events
Type: String
Default: 5m

ScaleInAdjustment:
Description: Maximum adjustment to the desired capacity on scale in
Type: Number
MaxValue: -1
Default: -1

Mappings:
LambdaBucket:
us-east-1 : { Bucket: "buildkite-lambdas" }
Expand Down Expand Up @@ -97,11 +108,13 @@ Resources:
MemorySize: 128
Environment:
Variables:
BUILDKITE_AGENT_TOKEN: !Ref BuildkiteAgentToken
BUILDKITE_QUEUE: !Ref BuildkiteQueue
ASG_NAME: !Ref AgentAutoScaleGroup
LAMBDA_TIMEOUT: 1m
LAMBDA_INTERVAL: 20s
BUILDKITE_AGENT_TOKEN: !Ref BuildkiteAgentToken
BUILDKITE_QUEUE: !Ref BuildkiteQueue
ASG_NAME: !Ref AgentAutoScaleGroup
SCALE_IN_COOLDOWN_PERIOD: !Ref ScaleInCooldownPeriod
SCALE_IN_ADJUSTMENT: !Ref ScaleInAdjustment
LAMBDA_TIMEOUT: 1m
LAMBDA_INTERVAL: 20s

AutoscalingLambdaScheduledRule:
Type: "AWS::Events::Rule"
Expand Down

0 comments on commit e3e571a

Please sign in to comment.