From 9e2fcc4931f9e94383359cbb3b87e0b72852849a Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Fri, 3 May 2019 15:05:29 +1000 Subject: [PATCH 1/6] Add a configurable scale factor to scale in/out --- lambda/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lambda/main.go b/lambda/main.go index 8750e6b..2e67a56 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -81,6 +81,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { if scaleOutCooldownPeriod, err = time.ParseDuration(v); err != nil { return "", err } + scaleOutFactor = math.Abs(scaleOutFactor) } if v := os.Getenv(`SCALE_OUT_FACTOR`); v != "" { @@ -177,6 +178,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { log.Printf("Increasing poll interval to %v based on rate limit", interval) } + // Persist the times back into the global state lastScaleIn = scaler.LastScaleIn() lastScaleOut = scaler.LastScaleOut() From 785cb3289d9ee6c662fee58ba02f8c01fd4a6a2d Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Sun, 5 May 2019 11:39:59 +0800 Subject: [PATCH 2/6] Refactor scaler test to use metrics and params --- scaler/scaler_test.go | 211 +++++++++++++++++++++++++++++------------- 1 file changed, 145 insertions(+), 66 deletions(-) diff --git a/scaler/scaler_test.go b/scaler/scaler_test.go index c3cbc86..dd40f96 100644 --- a/scaler/scaler_test.go +++ b/scaler/scaler_test.go @@ -8,106 +8,169 @@ import ( ) func TestScalingOutWithoutError(t *testing.T) { - metrics := buildkite.AgentMetrics{ - ScheduledJobs: 10, - RunningJobs: 2, - } - for _, tc := range []struct { - agentsPerInstance int + params Params + metrics buildkite.AgentMetrics currentDesiredCapacity int64 - params ScaleParams expectedDesiredCapacity int64 }{ // Basic scale out { - agentsPerInstance: 1, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + }, expectedDesiredCapacity: 12, }, // Scale-out with multiple agents per instance { - agentsPerInstance: 4, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 4, + }, expectedDesiredCapacity: 3, }, { - agentsPerInstance: 2, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 2, + }, expectedDesiredCapacity: 6, }, // Scale-out with multiple agents per instance // where it doesn't divide evenly { - agentsPerInstance: 5, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 5, + }, expectedDesiredCapacity: 3, }, { - agentsPerInstance: 20, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 20, + }, expectedDesiredCapacity: 1, }, // Scale-out with a factor of 50% { - agentsPerInstance: 1, - params: ScaleParams{ - Factor: 0.5, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + ScaleOutParams: ScaleParams{ + Factor: 0.5, + }, }, expectedDesiredCapacity: 6, }, // Scale-out with a factor of 10% { - agentsPerInstance: 1, - params: ScaleParams{ - Factor: 0.10, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + ScaleOutParams: ScaleParams{ + Factor: 0.10, + }, }, - currentDesiredCapacity: 11, + currentDesiredCapacity: 11, expectedDesiredCapacity: 12, }, // Cool-down period is enforced { - agentsPerInstance: 1, - params: ScaleParams{ - LastEvent: time.Now(), - CooldownPeriod: 5 * time.Minute, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + ScaleOutParams: ScaleParams{ + LastEvent: time.Now(), + CooldownPeriod: 5 * time.Minute, + }, }, currentDesiredCapacity: 4, expectedDesiredCapacity: 4, }, // Cool-down period is passed { - agentsPerInstance: 1, - params: ScaleParams{ - LastEvent: time.Now().Add(-10 * time.Minute), - CooldownPeriod: 5 * time.Minute, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + ScaleOutParams: ScaleParams{ + LastEvent: time.Now().Add(-10 * time.Minute), + CooldownPeriod: 5 * time.Minute, + }, }, currentDesiredCapacity: 4, expectedDesiredCapacity: 12, }, // Cool-down period is passed, factor is applied { - agentsPerInstance: 1, - params: ScaleParams{ - Factor: 2.0, - LastEvent: time.Now().Add(-10 * time.Minute), - CooldownPeriod: 5 * time.Minute, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + ScaleOutParams: ScaleParams{ + Factor: 2.0, + LastEvent: time.Now().Add(-10 * time.Minute), + CooldownPeriod: 5 * time.Minute, + }, }, currentDesiredCapacity: 4, expectedDesiredCapacity: 20, }, // Scale out disabled { - agentsPerInstance: 1, - params: ScaleParams{ - Disable: true, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + RunningJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + ScaleOutParams: ScaleParams{ + Disable: true, + }, }, currentDesiredCapacity: 1, expectedDesiredCapacity: 1, }, } { t.Run("", func(t *testing.T) { - asg := &asgTestDriver{desiredCapacity: tc.currentDesiredCapacity} + asg := &asgTestDriver{ + desiredCapacity: tc.currentDesiredCapacity, + } s := Scaler{ autoscaling: asg, - bk: &buildkiteTestDriver{metrics: metrics}, - agentsPerInstance: tc.agentsPerInstance, - scaleOutParams: tc.params, + bk: &buildkiteTestDriver{metrics: tc.metrics}, + agentsPerInstance: tc.params.AgentsPerInstance, + scaleOutParams: tc.params.ScaleOutParams, } if _, err := s.Run(); err != nil { @@ -125,51 +188,67 @@ func TestScalingOutWithoutError(t *testing.T) { func TestScalingInWithoutError(t *testing.T) { testCases := []struct { + params Params + metrics buildkite.AgentMetrics currentDesiredCapacity int64 - scheduledJobs int64 - params ScaleParams expectedDesiredCapacity int64 }{ // We're inside cooldown { - currentDesiredCapacity: 10, - params: ScaleParams{ - CooldownPeriod: 5 * time.Minute, - LastEvent: time.Now(), + params: Params{ + AgentsPerInstance: 1, + ScaleInParams: ScaleParams{ + CooldownPeriod: 5 * time.Minute, + LastEvent: time.Now(), + }, }, + currentDesiredCapacity: 10, expectedDesiredCapacity: 10, }, // We're out of cooldown, apply factor { - currentDesiredCapacity: 10, - params: ScaleParams{ - CooldownPeriod: 5 * time.Minute, - LastEvent: time.Now().Add(-10 * time.Minute), - Factor: 0.10, + params: Params{ + AgentsPerInstance: 1, + ScaleInParams: ScaleParams{ + CooldownPeriod: 5 * time.Minute, + LastEvent: time.Now().Add(-10 * time.Minute), + Factor: 0.10, + }, }, + currentDesiredCapacity: 10, expectedDesiredCapacity: 9, }, // With 500% factor, we scale all the way down despite scheduled jobs { - currentDesiredCapacity: 20, - scheduledJobs: 10, - params: ScaleParams{ - Factor: 5.0, + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 10, + }, + params: Params{ + AgentsPerInstance: 1, + ScaleInParams: ScaleParams{ + Factor: 5.0, + }, }, + currentDesiredCapacity: 20, expectedDesiredCapacity: 0, }, // Make sure we round down so we eventually reach zero { - currentDesiredCapacity: 1, - params: ScaleParams{ - Factor: 0.10, + params: Params{ + AgentsPerInstance: 1, + ScaleInParams: ScaleParams{ + Factor: 0.10, + }, }, + currentDesiredCapacity: 1, expectedDesiredCapacity: 0, }, // Scale in disabled { - params: ScaleParams{ - Disable: true, + params: Params{ + ScaleInParams: ScaleParams{ + Disable: true, + }, }, currentDesiredCapacity: 1, expectedDesiredCapacity: 1, @@ -178,14 +257,14 @@ func TestScalingInWithoutError(t *testing.T) { for _, tc := range testCases { t.Run("", func(t *testing.T) { - asg := &asgTestDriver{desiredCapacity: tc.currentDesiredCapacity} + asg := &asgTestDriver{ + desiredCapacity: tc.currentDesiredCapacity, + } s := Scaler{ - autoscaling: asg, - bk: &buildkiteTestDriver{metrics: buildkite.AgentMetrics{ - ScheduledJobs: tc.scheduledJobs, - }}, - agentsPerInstance: 1, - scaleInParams: tc.params, + autoscaling: asg, + bk: &buildkiteTestDriver{metrics: tc.metrics}, + agentsPerInstance: tc.params.AgentsPerInstance, + scaleInParams: tc.params.ScaleInParams, } if _, err := s.Run(); err != nil { From 0b61903004d9976a5e3ca32d87fd786c4f14697d Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Sun, 5 May 2019 11:47:59 +0800 Subject: [PATCH 3/6] Support scaling including waiting jobs --- buildkite/buildkite.go | 7 +++++-- lambda/main.go | 10 +++++++++- main.go | 6 ++++-- scaler/scaler.go | 10 ++++++++++ scaler/scaler_test.go | 17 ++++++++++++++++- 5 files changed, 44 insertions(+), 6 deletions(-) diff --git a/buildkite/buildkite.go b/buildkite/buildkite.go index 302b4e9..0eb3d2b 100644 --- a/buildkite/buildkite.go +++ b/buildkite/buildkite.go @@ -37,6 +37,7 @@ type AgentMetrics struct { ScheduledJobs int64 RunningJobs int64 PollDuration time.Duration + WaitingJobs int64 } func (c *Client) GetAgentMetrics(queue string) (AgentMetrics, error) { @@ -50,6 +51,7 @@ func (c *Client) GetAgentMetrics(queue string) (AgentMetrics, error) { Queues map[string]struct { Scheduled int64 `json:"scheduled"` Running int64 `json:"running"` + Waiting int64 `json:"waiting"` } `json:"queues"` } `json:"jobs"` } @@ -70,10 +72,11 @@ func (c *Client) GetAgentMetrics(queue string) (AgentMetrics, error) { if queue, exists := resp.Jobs.Queues[queue]; exists { metrics.ScheduledJobs = queue.Scheduled metrics.RunningJobs = queue.Running + metrics.WaitingJobs = queue.Waiting } - log.Printf("↳ Got scheduled=%d, running=%d (took %v)", - metrics.ScheduledJobs, metrics.RunningJobs, queryDuration) + log.Printf("↳ Got scheduled=%d, running=%d, waiting=%d (took %v)", + metrics.ScheduledJobs, metrics.RunningJobs, metrics.WaitingJobs, queryDuration) return metrics, nil } diff --git a/lambda/main.go b/lambda/main.go index 2e67a56..a101b6d 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -47,7 +47,8 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { scaleOutCooldownPeriod time.Duration scaleOutFactor float64 - err error + ignoreWaiting bool + err error ) if v := os.Getenv(`LAMBDA_INTERVAL`); v != "" { @@ -91,6 +92,12 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { scaleOutFactor = math.Abs(scaleOutFactor) } + if v := os.Getenv(`IGNORE_WAITING`); v != "" { + if v == "true" || v == "1" { + ignoreWaiting = true + } + } + var mustGetEnv = func(env string) string { val := os.Getenv(env) if val == "" { @@ -136,6 +143,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { BuildkiteQueue: mustGetEnv(`BUILDKITE_QUEUE`), AutoScalingGroupName: mustGetEnv(`ASG_NAME`), AgentsPerInstance: mustGetEnvInt(`AGENTS_PER_INSTANCE`), + IgnoreWaiting: ignoreWaiting, ScaleInParams: scaler.ScaleParams{ CooldownPeriod: scaleInCooldownPeriod, Factor: scaleInFactor, diff --git a/main.go b/main.go index 16eba67..5f4235a 100644 --- a/main.go +++ b/main.go @@ -17,8 +17,9 @@ func main() { ssmTokenKey = flag.String("agent-token-ssm-key", "", "The AWS SSM Parameter Store key for the agent token") // buildkite params - buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics") - buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token") + buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics") + buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token") + buildkiteIgnoreWaiting = flag.Bool("ignore-waiting", false, "Whether to ignore jobs behind a wait step for scaling") // scale in/out params scaleInFactor = flag.Float64("scale-in-factor", 1.0, "A factor to apply to scale ins") @@ -45,6 +46,7 @@ func main() { AgentsPerInstance: *agentsPerInstance, PublishCloudWatchMetrics: *cwMetrics, DryRun: *dryRun, + IgnoreWaiting: *buildkiteIgnoreWaiting, ScaleInParams: scaler.ScaleParams{Factor: *scaleInFactor}, ScaleOutParams: scaler.ScaleParams{Factor: *scaleOutFactor}, }) diff --git a/scaler/scaler.go b/scaler/scaler.go index 2def2d3..c5e03de 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -23,6 +23,7 @@ type Params struct { UserAgent string PublishCloudWatchMetrics bool DryRun bool + IgnoreWaiting bool ScaleInParams ScaleParams ScaleOutParams ScaleParams } @@ -38,6 +39,7 @@ type Scaler struct { metrics interface { Publish(orgSlug, queue string, metrics map[string]int64) error } + ignoreWaiting bool agentsPerInstance int scaleInParams ScaleParams scaleOutParams ScaleParams @@ -49,6 +51,7 @@ func NewScaler(client *buildkite.Client, params Params) (*Scaler, error) { client: client, queue: params.BuildkiteQueue, }, + ignoreWaiting: params.IgnoreWaiting, agentsPerInstance: params.AgentsPerInstance, scaleInParams: params.ScaleInParams, scaleOutParams: params.ScaleOutParams, @@ -83,6 +86,7 @@ func (s *Scaler) Run() (time.Duration, error) { err = s.metrics.Publish(metrics.OrgSlug, metrics.Queue, map[string]int64{ `ScheduledJobsCount`: metrics.ScheduledJobs, `RunningJobsCount`: metrics.RunningJobs, + `WaitingJobsCount`: metrics.WaitingJobs, }) if err != nil { return metrics.PollDuration, err @@ -91,6 +95,12 @@ func (s *Scaler) Run() (time.Duration, error) { count := metrics.ScheduledJobs + metrics.RunningJobs + if !s.ignoreWaiting { + count += metrics.WaitingJobs + } else { + log.Printf("💸 Ignoring %d waiting jobs", metrics.WaitingJobs) + } + var desired int64 if count > 0 { desired = int64(math.Ceil(float64(count) / float64(s.agentsPerInstance))) diff --git a/scaler/scaler_test.go b/scaler/scaler_test.go index dd40f96..0aa308e 100644 --- a/scaler/scaler_test.go +++ b/scaler/scaler_test.go @@ -14,11 +14,25 @@ func TestScalingOutWithoutError(t *testing.T) { currentDesiredCapacity int64 expectedDesiredCapacity int64 }{ - // Basic scale out + // Basic scale out without waiting jobs { metrics: buildkite.AgentMetrics{ ScheduledJobs: 10, RunningJobs: 2, + WaitingJobs: 2, + }, + params: Params{ + AgentsPerInstance: 1, + IgnoreWaiting: true, + }, + expectedDesiredCapacity: 12, + }, + // Basic scale out with waiting jobs + { + metrics: buildkite.AgentMetrics{ + ScheduledJobs: 8, + RunningJobs: 2, + WaitingJobs: 2, }, params: Params{ AgentsPerInstance: 1, @@ -171,6 +185,7 @@ func TestScalingOutWithoutError(t *testing.T) { bk: &buildkiteTestDriver{metrics: tc.metrics}, agentsPerInstance: tc.params.AgentsPerInstance, scaleOutParams: tc.params.ScaleOutParams, + ignoreWaiting: tc.params.IgnoreWaiting, } if _, err := s.Run(); err != nil { From b99a63a64622561c38ca9f1527f9ecc1165fbed8 Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Sun, 5 May 2019 15:19:13 +0800 Subject: [PATCH 4/6] Fix bad merge --- lambda/main.go | 1 - 1 file changed, 1 deletion(-) diff --git a/lambda/main.go b/lambda/main.go index a101b6d..43dfb8f 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -82,7 +82,6 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { if scaleOutCooldownPeriod, err = time.ParseDuration(v); err != nil { return "", err } - scaleOutFactor = math.Abs(scaleOutFactor) } if v := os.Getenv(`SCALE_OUT_FACTOR`); v != "" { From db73707b757c2ac1c33703888eeb46ab31ee464b Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Sun, 5 May 2019 15:31:33 +0800 Subject: [PATCH 5/6] Exclude waiting jobs by default --- lambda/main.go | 10 +++++----- main.go | 8 ++++---- scaler/scaler.go | 8 ++++---- scaler/scaler_test.go | 4 ++-- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/lambda/main.go b/lambda/main.go index 43dfb8f..b01f688 100644 --- a/lambda/main.go +++ b/lambda/main.go @@ -47,8 +47,8 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { scaleOutCooldownPeriod time.Duration scaleOutFactor float64 - ignoreWaiting bool - err error + includeWaiting bool + err error ) if v := os.Getenv(`LAMBDA_INTERVAL`); v != "" { @@ -91,9 +91,9 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { scaleOutFactor = math.Abs(scaleOutFactor) } - if v := os.Getenv(`IGNORE_WAITING`); v != "" { + if v := os.Getenv(`INCLUDE_WAITING`); v != "" { if v == "true" || v == "1" { - ignoreWaiting = true + includeWaiting = true } } @@ -142,7 +142,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) { BuildkiteQueue: mustGetEnv(`BUILDKITE_QUEUE`), AutoScalingGroupName: mustGetEnv(`ASG_NAME`), AgentsPerInstance: mustGetEnvInt(`AGENTS_PER_INSTANCE`), - IgnoreWaiting: ignoreWaiting, + IncludeWaiting: includeWaiting, ScaleInParams: scaler.ScaleParams{ CooldownPeriod: scaleInCooldownPeriod, Factor: scaleInFactor, diff --git a/main.go b/main.go index 5f4235a..b64f986 100644 --- a/main.go +++ b/main.go @@ -17,9 +17,9 @@ func main() { ssmTokenKey = flag.String("agent-token-ssm-key", "", "The AWS SSM Parameter Store key for the agent token") // buildkite params - buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics") - buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token") - buildkiteIgnoreWaiting = flag.Bool("ignore-waiting", false, "Whether to ignore jobs behind a wait step for scaling") + buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics") + buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token") + includeWaiting = flag.Bool("include-waiting", false, "Whether to include jobs behind a wait step for scaling") // scale in/out params scaleInFactor = flag.Float64("scale-in-factor", 1.0, "A factor to apply to scale ins") @@ -46,7 +46,7 @@ func main() { AgentsPerInstance: *agentsPerInstance, PublishCloudWatchMetrics: *cwMetrics, DryRun: *dryRun, - IgnoreWaiting: *buildkiteIgnoreWaiting, + IncludeWaiting: *includeWaiting, ScaleInParams: scaler.ScaleParams{Factor: *scaleInFactor}, ScaleOutParams: scaler.ScaleParams{Factor: *scaleOutFactor}, }) diff --git a/scaler/scaler.go b/scaler/scaler.go index c5e03de..53d9092 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -23,7 +23,7 @@ type Params struct { UserAgent string PublishCloudWatchMetrics bool DryRun bool - IgnoreWaiting bool + IncludeWaiting bool ScaleInParams ScaleParams ScaleOutParams ScaleParams } @@ -39,7 +39,7 @@ type Scaler struct { metrics interface { Publish(orgSlug, queue string, metrics map[string]int64) error } - ignoreWaiting bool + includeWaiting bool agentsPerInstance int scaleInParams ScaleParams scaleOutParams ScaleParams @@ -51,7 +51,7 @@ func NewScaler(client *buildkite.Client, params Params) (*Scaler, error) { client: client, queue: params.BuildkiteQueue, }, - ignoreWaiting: params.IgnoreWaiting, + includeWaiting: params.IncludeWaiting, agentsPerInstance: params.AgentsPerInstance, scaleInParams: params.ScaleInParams, scaleOutParams: params.ScaleOutParams, @@ -95,7 +95,7 @@ func (s *Scaler) Run() (time.Duration, error) { count := metrics.ScheduledJobs + metrics.RunningJobs - if !s.ignoreWaiting { + if s.includeWaiting { count += metrics.WaitingJobs } else { log.Printf("💸 Ignoring %d waiting jobs", metrics.WaitingJobs) diff --git a/scaler/scaler_test.go b/scaler/scaler_test.go index 0aa308e..661213c 100644 --- a/scaler/scaler_test.go +++ b/scaler/scaler_test.go @@ -23,7 +23,6 @@ func TestScalingOutWithoutError(t *testing.T) { }, params: Params{ AgentsPerInstance: 1, - IgnoreWaiting: true, }, expectedDesiredCapacity: 12, }, @@ -36,6 +35,7 @@ func TestScalingOutWithoutError(t *testing.T) { }, params: Params{ AgentsPerInstance: 1, + IncludeWaiting: true, }, expectedDesiredCapacity: 12, }, @@ -185,7 +185,7 @@ func TestScalingOutWithoutError(t *testing.T) { bk: &buildkiteTestDriver{metrics: tc.metrics}, agentsPerInstance: tc.params.AgentsPerInstance, scaleOutParams: tc.params.ScaleOutParams, - ignoreWaiting: tc.params.IgnoreWaiting, + includeWaiting: tc.params.IncludeWaiting, } if _, err := s.Run(); err != nil { From 278baa34f765f924963b596eb4946cd7b3de33c9 Mon Sep 17 00:00:00 2001 From: Lachlan Donald Date: Sun, 5 May 2019 16:41:47 +0800 Subject: [PATCH 6/6] Use max(waiting, running) rather than waiting+running --- scaler/scaler.go | 10 +++++++--- scaler/scaler_test.go | 8 ++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/scaler/scaler.go b/scaler/scaler.go index 53d9092..cb1dd75 100644 --- a/scaler/scaler.go +++ b/scaler/scaler.go @@ -93,12 +93,16 @@ func (s *Scaler) Run() (time.Duration, error) { } } - count := metrics.ScheduledJobs + metrics.RunningJobs + count := metrics.ScheduledJobs - if s.includeWaiting { + // If waiting jobs are greater than running jobs then optionally + // use waiting jobs for scaling so that we have instances booted + // by the time we get to them. This is a gamble, as if the instances + // scale down before the jobs get scheduled, it's a huge waste. + if s.includeWaiting && metrics.WaitingJobs > metrics.RunningJobs { count += metrics.WaitingJobs } else { - log.Printf("💸 Ignoring %d waiting jobs", metrics.WaitingJobs) + count += metrics.RunningJobs } var desired int64 diff --git a/scaler/scaler_test.go b/scaler/scaler_test.go index 661213c..9d91a42 100644 --- a/scaler/scaler_test.go +++ b/scaler/scaler_test.go @@ -31,13 +31,13 @@ func TestScalingOutWithoutError(t *testing.T) { metrics: buildkite.AgentMetrics{ ScheduledJobs: 8, RunningJobs: 2, - WaitingJobs: 2, + WaitingJobs: 20, }, params: Params{ AgentsPerInstance: 1, - IncludeWaiting: true, + IncludeWaiting: true, }, - expectedDesiredCapacity: 12, + expectedDesiredCapacity: 28, }, // Scale-out with multiple agents per instance { @@ -185,7 +185,7 @@ func TestScalingOutWithoutError(t *testing.T) { bk: &buildkiteTestDriver{metrics: tc.metrics}, agentsPerInstance: tc.params.AgentsPerInstance, scaleOutParams: tc.params.ScaleOutParams, - includeWaiting: tc.params.IncludeWaiting, + includeWaiting: tc.params.IncludeWaiting, } if _, err := s.Run(); err != nil {