Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add scaling to include waiting jobs #17

Merged
merged 6 commits into from
May 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions buildkite/buildkite.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type AgentMetrics struct {
ScheduledJobs int64
RunningJobs int64
PollDuration time.Duration
WaitingJobs int64
}

func (c *Client) GetAgentMetrics(queue string) (AgentMetrics, error) {
Expand All @@ -50,6 +51,7 @@ func (c *Client) GetAgentMetrics(queue string) (AgentMetrics, error) {
Queues map[string]struct {
Scheduled int64 `json:"scheduled"`
Running int64 `json:"running"`
Waiting int64 `json:"waiting"`
} `json:"queues"`
} `json:"jobs"`
}
Expand All @@ -70,10 +72,11 @@ func (c *Client) GetAgentMetrics(queue string) (AgentMetrics, error) {
if queue, exists := resp.Jobs.Queues[queue]; exists {
metrics.ScheduledJobs = queue.Scheduled
metrics.RunningJobs = queue.Running
metrics.WaitingJobs = queue.Waiting
}

log.Printf("↳ Got scheduled=%d, running=%d (took %v)",
metrics.ScheduledJobs, metrics.RunningJobs, queryDuration)
log.Printf("↳ Got scheduled=%d, running=%d, waiting=%d (took %v)",
metrics.ScheduledJobs, metrics.RunningJobs, metrics.WaitingJobs, queryDuration)
return metrics, nil
}

Expand Down
11 changes: 10 additions & 1 deletion lambda/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
scaleOutCooldownPeriod time.Duration
scaleOutFactor float64

err error
includeWaiting bool
err error
)

if v := os.Getenv(`LAMBDA_INTERVAL`); v != "" {
Expand Down Expand Up @@ -90,6 +91,12 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
scaleOutFactor = math.Abs(scaleOutFactor)
}

if v := os.Getenv(`INCLUDE_WAITING`); v != "" {
if v == "true" || v == "1" {
includeWaiting = true
}
}

var mustGetEnv = func(env string) string {
val := os.Getenv(env)
if val == "" {
Expand Down Expand Up @@ -135,6 +142,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
BuildkiteQueue: mustGetEnv(`BUILDKITE_QUEUE`),
AutoScalingGroupName: mustGetEnv(`ASG_NAME`),
AgentsPerInstance: mustGetEnvInt(`AGENTS_PER_INSTANCE`),
IncludeWaiting: includeWaiting,
ScaleInParams: scaler.ScaleParams{
CooldownPeriod: scaleInCooldownPeriod,
Factor: scaleInFactor,
Expand Down Expand Up @@ -177,6 +185,7 @@ func Handler(ctx context.Context, evt json.RawMessage) (string, error) {
log.Printf("Increasing poll interval to %v based on rate limit",
interval)
}

// Persist the times back into the global state
lastScaleIn = scaler.LastScaleIn()
lastScaleOut = scaler.LastScaleOut()
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func main() {
// buildkite params
buildkiteQueue = flag.String("queue", "default", "The queue to watch in the metrics")
buildkiteAgentToken = flag.String("agent-token", "", "A buildkite agent registration token")
includeWaiting = flag.Bool("include-waiting", false, "Whether to include jobs behind a wait step for scaling")

// scale in/out params
scaleInFactor = flag.Float64("scale-in-factor", 1.0, "A factor to apply to scale ins")
Expand All @@ -45,6 +46,7 @@ func main() {
AgentsPerInstance: *agentsPerInstance,
PublishCloudWatchMetrics: *cwMetrics,
DryRun: *dryRun,
IncludeWaiting: *includeWaiting,
ScaleInParams: scaler.ScaleParams{Factor: *scaleInFactor},
ScaleOutParams: scaler.ScaleParams{Factor: *scaleOutFactor},
})
Expand Down
16 changes: 15 additions & 1 deletion scaler/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Params struct {
UserAgent string
PublishCloudWatchMetrics bool
DryRun bool
IncludeWaiting bool
ScaleInParams ScaleParams
ScaleOutParams ScaleParams
}
Expand All @@ -38,6 +39,7 @@ type Scaler struct {
metrics interface {
Publish(orgSlug, queue string, metrics map[string]int64) error
}
includeWaiting bool
agentsPerInstance int
scaleInParams ScaleParams
scaleOutParams ScaleParams
Expand All @@ -49,6 +51,7 @@ func NewScaler(client *buildkite.Client, params Params) (*Scaler, error) {
client: client,
queue: params.BuildkiteQueue,
},
includeWaiting: params.IncludeWaiting,
agentsPerInstance: params.AgentsPerInstance,
scaleInParams: params.ScaleInParams,
scaleOutParams: params.ScaleOutParams,
Expand Down Expand Up @@ -83,13 +86,24 @@ func (s *Scaler) Run() (time.Duration, error) {
err = s.metrics.Publish(metrics.OrgSlug, metrics.Queue, map[string]int64{
`ScheduledJobsCount`: metrics.ScheduledJobs,
`RunningJobsCount`: metrics.RunningJobs,
`WaitingJobsCount`: metrics.WaitingJobs,
})
if err != nil {
return metrics.PollDuration, err
}
}

count := metrics.ScheduledJobs + metrics.RunningJobs
count := metrics.ScheduledJobs

// If waiting jobs are greater than running jobs then optionally
// use waiting jobs for scaling so that we have instances booted
// by the time we get to them. This is a gamble, as if the instances
// scale down before the jobs get scheduled, it's a huge waste.
if s.includeWaiting && metrics.WaitingJobs > metrics.RunningJobs {
count += metrics.WaitingJobs
} else {
count += metrics.RunningJobs
}

var desired int64
if count > 0 {
Expand Down
Loading