Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scale-in cooldown support #6

Merged
merged 1 commit into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions lambda/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (

var invokeCount = 0

// lastScaleDownTime stores the last time we scaled down the ASG
// On a cold start this will be reset to Jan 1st, 1970
var lastScaleInTime time.Time

func main() {
if os.Getenv(`DEBUG`) != "" {
_, err := Handler(context.Background(), json.RawMessage([]byte{}))
Expand All @@ -35,6 +39,8 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {

var timeout <-chan time.Time = make(chan time.Time)
var interval time.Duration = 10 * time.Second
var scaleInCooldownPeriod time.Duration = 5 * time.Minute
var scaleInAdjustment int64 = -1

if intervalStr := os.Getenv(`LAMBDA_INTERVAL`); intervalStr != "" {
var err error
Expand All @@ -52,6 +58,26 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
timeout = time.After(timeoutDuration)
}

if scaleInCooldownPeriodStr := os.Getenv(`SCALE_IN_COOLDOWN_PERIOD`); scaleInCooldownPeriodStr != "" {
var err error
scaleInCooldownPeriod, err = time.ParseDuration(scaleInCooldownPeriodStr)
if err != nil {
return "", err
}
}

if scaleInAdjustmentStr := os.Getenv(`SCALE_IN_ADJUSTMENT`); scaleInAdjustmentStr != "" {
var err error
scaleInAdjustment, err = strconv.ParseInt(scaleInAdjustmentStr, 10, 64)
if err != nil {
return "", err
}

if scaleInAdjustment >= 0 {
panic(fmt.Sprintf("Scale in adjustment (%d) must be negative", scaleInAdjustment))
}
}

var mustGetEnv = func(env string) string {
val := os.Getenv(env)
if val == "" {
Expand Down Expand Up @@ -80,6 +106,11 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
BuildkiteQueue: mustGetEnv(`BUILDKITE_QUEUE`),
AutoScalingGroupName: mustGetEnv(`ASG_NAME`),
AgentsPerInstance: mustGetEnvInt(`AGENTS_PER_INSTANCE`),
ScaleInParams: scaler.ScaleInParams{
CooldownPeriod: scaleInCooldownPeriod,
Adjustment: scaleInAdjustment,
LastScaleInTime: &lastScaleInTime,
},
}

if m := os.Getenv(`CLOUDWATCH_METRICS`); m == `true` || m == `1` {
Expand Down
10 changes: 10 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"flag"
"log"
"time"

"github.com/buildkite/buildkite-agent-scaler/buildkite"
"github.com/buildkite/buildkite-agent-scaler/scaler"
Expand All @@ -19,6 +20,9 @@ func main() {
buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics")
buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token")

// scale in params
scaleInAdjustment = flag.Int64("scale-in-adjustment", -1, "Maximum adjustment to the desired capacity on scale in")

// general params
dryRun = flag.Bool("dry-run", false, "Whether to just show what would be done")
)
Expand All @@ -32,6 +36,12 @@ func main() {
AgentsPerInstance: *agentsPerInstance,
PublishCloudWatchMetrics: *cwMetrics,
DryRun: *dryRun,
ScaleInParams: scaler.ScaleInParams{
// We run in one-shot so cooldown isn't implemented
CooldownPeriod: time.Duration(0),
Adjustment: *scaleInAdjustment,
LastScaleInTime: &time.Time{},
},
})
if err != nil {
log.Fatal(err)
Expand Down
23 changes: 23 additions & 0 deletions scaler/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import (
"github.com/buildkite/buildkite-agent-scaler/buildkite"
)

type ScaleInParams struct {
CooldownPeriod time.Duration
Adjustment int64
LastScaleInTime *time.Time
}

type Params struct {
AutoScalingGroupName string
AgentsPerInstance int
Expand All @@ -16,6 +22,8 @@ type Params struct {
UserAgent string
PublishCloudWatchMetrics bool
DryRun bool

ScaleInParams ScaleInParams
}

type Scaler struct {
Expand All @@ -30,6 +38,7 @@ type Scaler struct {
Publish(metrics map[string]int64) error
}
agentsPerInstance int
scaleInParams ScaleInParams
}

func NewScaler(bk *buildkite.Client, params Params) (*Scaler, error) {
Expand All @@ -39,6 +48,7 @@ func NewScaler(bk *buildkite.Client, params Params) (*Scaler, error) {
queue: params.BuildkiteQueue,
},
agentsPerInstance: params.AgentsPerInstance,
scaleInParams: params.ScaleInParams,
}

orgSlug, err := bk.GetOrgSlug()
Expand Down Expand Up @@ -114,13 +124,26 @@ func (s *Scaler) Run() error {

log.Printf("↳ Set desired to %d (took %v)", desired, time.Now().Sub(t))
} else if current.DesiredCount > desired {
cooldownRemaining := s.scaleInParams.CooldownPeriod - time.Since(*s.scaleInParams.LastScaleInTime)

if cooldownRemaining > 0 {
log.Printf("⏲ Want to scale IN but in cooldown for %d seconds", cooldownRemaining/time.Second)
return nil
}

minimumDesired := current.DesiredCount + s.scaleInParams.Adjustment
if desired < minimumDesired {
desired = minimumDesired
}

log.Printf("Scaling IN 📉 to %d instances (currently %d)", desired, current.DesiredCount)

err = s.autoscaling.SetDesiredCapacity(desired)
if err != nil {
return err
}

*s.scaleInParams.LastScaleInTime = time.Now()
log.Printf("↳ Set desired to %d (took %v)", desired, time.Now().Sub(t))
} else {
log.Printf("No scaling required, currently %d", current.DesiredCount)
Expand Down
74 changes: 72 additions & 2 deletions scaler/scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package scaler

import (
"testing"
"time"
)

func TestScalingWithoutError(t *testing.T) {
func TestScalingOutWithoutError(t *testing.T) {
for _, tc := range []struct {
ScheduledJobs int64
AgentsPerInstance int
Expand Down Expand Up @@ -37,6 +38,71 @@ func TestScalingWithoutError(t *testing.T) {
}
}

func TestScalingInWithoutError(t *testing.T) {
testCases := []struct {
currentDesiredCapacity int64
coolDownPeriod time.Duration
lastScaleInTime time.Time
adjustment int64

expectedDesiredCapacity int64
}{
{
currentDesiredCapacity: 10,
coolDownPeriod: 5 * time.Minute,
lastScaleInTime: time.Now(),
adjustment: -1,

// We're inside cooldown
expectedDesiredCapacity: 10,
},
{
currentDesiredCapacity: 10,
coolDownPeriod: 5 * time.Minute,
lastScaleInTime: time.Now().Add(-10 * time.Minute),
adjustment: -2,

// We're out of cooldown but we can only adjust by -2
expectedDesiredCapacity: 8,
},
{
currentDesiredCapacity: 10,
coolDownPeriod: 5 * time.Minute,
lastScaleInTime: time.Now().Add(-10 * time.Minute),
adjustment: -100,

// We're allowed to adjust the whole amount
expectedDesiredCapacity: 0,
},
}

for _, tc := range testCases {
t.Run("", func(t *testing.T) {
asg := &asgTestDriver{desiredCapacity: tc.currentDesiredCapacity}
s := Scaler{
autoscaling: asg,
bk: &buildkiteTestDriver{count: 0},
agentsPerInstance: 1,
scaleInParams: ScaleInParams{
CooldownPeriod: tc.coolDownPeriod,
Adjustment: tc.adjustment,
LastScaleInTime: &tc.lastScaleInTime,
},
}

if err := s.Run(); err != nil {
t.Fatal(err)
}

if asg.desiredCapacity != tc.expectedDesiredCapacity {
t.Fatalf("Expected desired capacity of %d, got %d",
tc.expectedDesiredCapacity, asg.desiredCapacity,
)
}
})
}
}

type buildkiteTestDriver struct {
count int64
err error
Expand All @@ -52,7 +118,11 @@ type asgTestDriver struct {
}

func (d *asgTestDriver) Describe() (AutoscaleGroupDetails, error) {
return AutoscaleGroupDetails{MinSize: 0, MaxSize: 100}, nil
return AutoscaleGroupDetails{
DesiredCount: d.desiredCapacity,
MinSize: 0,
MaxSize: 100,
}, nil
}

func (d *asgTestDriver) SetDesiredCapacity(count int64) error {
Expand Down
23 changes: 18 additions & 5 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ Parameters:
Type: String
Default: default

ScaleInCooldownPeriod:
Description: Cooldown period between scale in events
Type: String
Default: 5m

ScaleInAdjustment:
Description: Maximum adjustment to the desired capacity on scale in
Type: Number
MaxValue: -1
Default: -1

Mappings:
LambdaBucket:
us-east-1 : { Bucket: "buildkite-lambdas" }
Expand Down Expand Up @@ -97,11 +108,13 @@ Resources:
MemorySize: 128
Environment:
Variables:
BUILDKITE_AGENT_TOKEN: !Ref BuildkiteAgentToken
BUILDKITE_QUEUE: !Ref BuildkiteQueue
ASG_NAME: !Ref AgentAutoScaleGroup
LAMBDA_TIMEOUT: 1m
LAMBDA_INTERVAL: 20s
BUILDKITE_AGENT_TOKEN: !Ref BuildkiteAgentToken
BUILDKITE_QUEUE: !Ref BuildkiteQueue
ASG_NAME: !Ref AgentAutoScaleGroup
SCALE_IN_COOLDOWN_PERIOD: !Ref ScaleInCooldownPeriod
SCALE_IN_ADJUSTMENT: !Ref ScaleInAdjustment
LAMBDA_TIMEOUT: 1m
LAMBDA_INTERVAL: 20s

AutoscalingLambdaScheduledRule:
Type: "AWS::Events::Rule"
Expand Down