Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scale up with no assigned node #467

Merged
merged 12 commits into from
Nov 3, 2017
121 changes: 83 additions & 38 deletions go/autoscaler/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ type ClusterResource struct {
MemoryRequestMega int64
MemoryLimitMega int64
MemoryTotalMega int64

NodeInfos NodeInfos
}

// NodeInfos is the information of all nodes.
type NodeInfos struct {
NodesCPUIdleMilli map[string]int64
NodesMemoryFreeMega map[string]int64
}

// String returns the string that represents NodeInfo when printed.
func (n NodeInfos) String() string {
return fmt.Sprintf("NodeInfo(%d nodes)", len(n.NodesCPUIdleMilli))
}

// Cluster represents the cluster managment system such as Kubernetes.
Expand Down Expand Up @@ -222,6 +235,26 @@ nextJob:
return js
}

func searchAssignableNodeByCPU(r *ClusterResource, j job) (assignable bool) {
for _, idle := range r.NodeInfos.NodesCPUIdleMilli {
if j.TrainerCPURequestMilli() <= idle {
return true
}
}
log.Debug("No node is assignable, job CPU is ", j.TrainerMemRequestMega())
return false
}

func searchAssignableNodeByMem(r *ClusterResource, j job) (assignable bool) {
for _, idle := range r.NodeInfos.NodesMemoryFreeMega {
if j.TrainerMemRequestMega() <= idle {
return true
}
}
log.Debug("No node is assignable, job memory is ", j.TrainerMemRequestMega())
return false
}

func scaleDryRun(r *ClusterResource, j job, curDiff int, maxLoadDesired float64, scaleDown bool) (additional int) {
additionalGPUInstance := 0
additionalCPUInstance := 0
Expand Down Expand Up @@ -282,6 +315,12 @@ func scaleDryRun(r *ClusterResource, j job, curDiff int, maxLoadDesired float64,
return
}

if !searchAssignableNodeByCPU(r, j) || !searchAssignableNodeByMem(r, j) {
// can not find assignable node, do not scale
additional = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to return immediately from here, or else the code below may be executed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

return
}

// NOTE: do not scale up to use full cluster resource of CPU
// but we do scale up for GPU.
if int64(float64(r.CPUTotalMilli)*maxLoadDesired)-r.CPURequestMilli >= cpuRequestMilli {
Expand Down Expand Up @@ -349,43 +388,41 @@ func scaleAllDryRun(jobs []job, r ClusterResource, maxLoadDesired float64) map[s
return diff
}

func (a *Autoscaler) scaleAll(diff map[string]int) {
for name := range diff {
if diff[name] != 0 {
log.Info("scaling job",
"name", name, "number of instances", diff[name])
target := *a.jobs[name].TrainerJob.Spec.Parallelism + int32(diff[name])

var err error
for retry := 5; retry > 0; retry-- {
var tj *batchv1.Job
// don't shadow err
tj, err = a.cluster.GetTrainerJob(a.jobs[name].Config)
if err != nil {
log.Warn("sync trainer job error",
"error", err, "remaining retry", retry)
continue
}
j := a.jobs[name]
// NOTE: only update batchv1.job from k8s api-server before updating
// for efficiency. Update the job resource to get latest k8s
// resource reversion.
j.TrainerJob = tj
a.jobs[name] = j
*a.jobs[name].TrainerJob.Spec.Parallelism = target
err = a.cluster.UpdateTrainerJob(a.jobs[name].TrainerJob)
if err != nil {
log.Error("error updating trainer job",
"error", err, "remaining retry", retry)
continue
}

break
func (a *Autoscaler) scaleAll(target map[string]int) {
for name := range target {
log.Info("scaling job",
"name", name, "target", target[name])
target := int32(target[name])

var err error
for retry := 5; retry > 0; retry-- {
var tj *batchv1.Job
// don't shadow err
tj, err = a.cluster.GetTrainerJob(a.jobs[name].Config)
if err != nil {
log.Warn("sync trainer job error",
"error", err, "remaining retry", retry)
continue
}

j := a.jobs[name]
// NOTE: only update batchv1.job from k8s api-server before updating
// for efficiency. Update the job resource to get latest k8s
// resource reversion.
j.TrainerJob = tj
a.jobs[name] = j
*a.jobs[name].TrainerJob.Spec.Parallelism = target
err = a.cluster.UpdateTrainerJob(a.jobs[name].TrainerJob)
if err != nil {
log.Warn("Error updating trainer job", "error", err)
log.Error("error updating trainer job",
"error", err, "remaining retry", retry)
continue
}

break
}

if err != nil {
log.Warn("Error updating trainer job", "error", err)
}
}
}
Expand Down Expand Up @@ -444,9 +481,10 @@ func (a *Autoscaler) Monitor() {
log.Error("error sync resource", "error", err)
continue
}
log.Info("sync cluster resource done", "resource", r)

var js []job
for _, j := range a.jobs {
for key, j := range a.jobs {
// k8s job for trainers may not be created immediently
// try sync it here
if j.TrainerJob == nil {
Expand All @@ -460,7 +498,9 @@ func (a *Autoscaler) Monitor() {
continue
}
j.TrainerJob = tj
a.jobs[key] = j
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to write back to the map, j.TrainerJob = tj is not sufficient.

Copy link
Collaborator

@typhoonzero typhoonzero Nov 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refine the code later is fine. Also, the scaling algorithm should be updated like:

  1. ScaleDryRun search in order to find a sufficient node to scale up 1 pod, return the diff.
  2. Cluster total resource is not used for the scale-up, use only per node resource.
  3. Find a way to ensure max_load_desired for each node.

I'll add a design doc for details.

Copy link
Collaborator

@helinwang helinwang Nov 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @typhoonzero !

Btw, maybe max_load_desired is for the entire cluster? Otherwise if it's for each node on max_load_desired=0.97, each node will have 0.03*12=0.36 CPUs, probably many pod can not be scheduled on it anyway.

}

// Scale jobs only when all pods' "Phase" are running.
// Pending/starting/terminating jobs are ignored.
total, running, err := a.cluster.JobPods(j.Config)
Expand All @@ -473,10 +513,15 @@ func (a *Autoscaler) Monitor() {
}
}
diff := scaleAllDryRun(js, r, a.maxLoadDesired)
if len(diff) > 0 {
log.Info("calculated scaling plan", "plan", diff,
target := make(map[string]int)
for k, v := range diff {
target[k] = int(*a.jobs[k].TrainerJob.Spec.Parallelism) + v
}

if len(target) > 0 {
log.Info("calculated scaling plan", "target", target,
"clusterResource", r)
}
a.scaleAll(diff)
a.scaleAll(target)
}
}
18 changes: 18 additions & 0 deletions go/autoscaler/autoscaler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func TestScaleDryRunSatisfied(t *testing.T) {
assert.Equal(t, 0, scaleDryRun(&r, j, 0, 1.0, false))
}

var allIdleNodeInfos = NodeInfos{
NodesCPUIdleMilli: map[string]int64{"": 99999},
NodesMemoryFreeMega: map[string]int64{"": 99999},
}

func TestScaleDryRunMoreCPU(t *testing.T) {
r := ClusterResource{
CPULimitMilli: 100,
Expand All @@ -113,6 +118,7 @@ func TestScaleDryRunMoreCPU(t *testing.T) {
MemoryRequestMega: 100,
MemoryLimitMega: 100,
MemoryTotalMega: 1000,
NodeInfos: allIdleNodeInfos,
}
j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1)
assert.Equal(t, 1, scaleDryRun(&r, j, 0, 1.0, false))
Expand All @@ -126,6 +132,7 @@ func TestScaleDryRunNoMoreCPU(t *testing.T) {
MemoryRequestMega: 100,
MemoryLimitMega: 100,
MemoryTotalMega: 1000,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1)
Expand All @@ -143,6 +150,7 @@ func TestScaleDryRunMoreGPU(t *testing.T) {
GPULimit: 0,
GPURequest: 0,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}
j := makeJob("name", "1", "1", "10Mi", "10Mi", "1", 1, 3, 1)
assert.Equal(t, 1, scaleDryRun(&r, j, 0, 1.0, false))
Expand All @@ -160,6 +168,7 @@ func TestScaleDryRunNoMoreGPU(t *testing.T) {
GPULimit: 10,
GPURequest: 10,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "10Mi", "10Mi", "1", 1, 3, 1)
Expand Down Expand Up @@ -197,6 +206,7 @@ func TestScaleDryRunScaleDownToMin(t *testing.T) {
GPULimit: 10,
GPURequest: 10,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "10Mi", "10Mi", "0", 1, 3, 3)
Expand All @@ -216,6 +226,7 @@ func TestScaleDryRunScaleDownFullCluster(t *testing.T) {
GPULimit: 10,
GPURequest: 10,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "10Mi", "10Mi", "0", 1, 3, 3)
Expand All @@ -234,6 +245,7 @@ func TestScaleDryRunNoMem(t *testing.T) {
GPULimit: 10,
GPURequest: 10,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1)
Expand All @@ -247,6 +259,7 @@ func TestScaleAllDryRunNoMem(t *testing.T) {
MemoryLimitMega: 1000,
MemoryTotalMega: 1000,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1)
Expand All @@ -265,6 +278,7 @@ func TestScaleAllDryRun(t *testing.T) {
GPULimit: 8,
GPURequest: 8,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1)
Expand All @@ -283,6 +297,7 @@ func TestScaleAllDryRunNotFull(t *testing.T) {
GPULimit: 0,
GPURequest: 0,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1)
Expand All @@ -301,6 +316,7 @@ func TestScaleAllDryRunDownNotFull(t *testing.T) {
GPULimit: 0,
GPURequest: 0,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 3)
Expand All @@ -319,6 +335,7 @@ func TestScaleAllDryRunLessCPU(t *testing.T) {
GPULimit: 8,
GPURequest: 8,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1)
Expand All @@ -337,6 +354,7 @@ func TestScaleAllDryRunLessGPU(t *testing.T) {
GPULimit: 9,
GPURequest: 9,
GPUTotal: 10,
NodeInfos: allIdleNodeInfos,
}

j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1)
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
func main() {
kubeconfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.")
logLevel := flag.String("log_level", "info", "Log level can be debug, info, warn, error, crit.")
maxLoadDesired := flag.Float64("max_load_desired", 0.9, `Keep the cluster max resource usage around
maxLoadDesired := flag.Float64("max_load_desired", 0.97, `Keep the cluster max resource usage around
this value, jobs will scale down if total request is over this level.`)
flag.Parse()

Expand Down
37 changes: 36 additions & 1 deletion go/controller/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,25 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l
return
}

func updateNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryFreeMega map[string]int64) (err error) {
for _, pod := range podList.Items {
nodeName := pod.Spec.NodeName
if nodeName == "" {
continue
}
for _, container := range pod.Spec.Containers {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check only pending and running pods?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only Terminating and Running Pod will take up the resource.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, so maybe we need to check only "Terminating and Running" pod? :)

Copy link
Collaborator Author

@Yancey1989 Yancey1989 Nov 2, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, follow the comment https://github.com/kubernetes/kubernetes/blob/v1.6.2/pkg/api/v1/types.go#L2331, if NodeName is non-empty, the Pod will be assigned to the Node, mean take up the resource on the Node.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so it's not necessary to check pod status, since as long as pod.Spec.NodeName is not empty, it's assigned to a node, and occupying resource.

nodesCPUIdleMilli[nodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli)
nodesMemoryFreeMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega)
}

for _, container := range pod.Spec.InitContainers {
nodesCPUIdleMilli[nodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli)
nodesMemoryFreeMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega)
}
}
return
}

// SyncResource will update free and total resources in k8s cluster.
func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) {
nodes := c.clientset.CoreV1().Nodes()
Expand All @@ -103,7 +122,14 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) {
return autoscaler.ClusterResource{}, err
}
allocatable := make(v1.ResourceList)
nodesCPUIdleMilli := make(map[string]int64)
nodesMemoryFreeMega := make(map[string]int64)

for _, node := range nodeList.Items {
nodesCPUIdleMilli[node.GetObjectMeta().GetName()] =
node.Status.Allocatable.Cpu().ScaledValue(resource.Milli)
nodesMemoryFreeMega[node.GetObjectMeta().GetName()] =
node.Status.Allocatable.Memory().ScaledValue(resource.Mega)
AddResourceList(allocatable, node.Status.Allocatable)
}

Expand All @@ -130,6 +156,11 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) {
return autoscaler.ClusterResource{}, err
}

err = updateNodesIdleResource(allPodsList, nodesCPUIdleMilli, nodesMemoryFreeMega)
if err != nil {
return autoscaler.ClusterResource{}, err
}

res = autoscaler.ClusterResource{
NodeCount: len(nodeList.Items),
GPUTotal: int(allocatable.NvidiaGPU().Value()),
Expand All @@ -143,7 +174,11 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) {
GPULimit: int(allLimits.NvidiaGPU().Value()),
CPULimitMilli: allLimits.Cpu().ScaledValue(resource.Milli),
MemoryLimitMega: allLimits.Memory().ScaledValue(resource.Mega),
}

NodeInfos: autoscaler.NodeInfos{
NodesCPUIdleMilli: nodesCPUIdleMilli,
NodesMemoryFreeMega: nodesMemoryFreeMega,
},
}
return
}