From 8ca7086dd2bb6564fd71698ccd0ca4760671f1ba Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 2 Nov 2017 17:01:00 +0800 Subject: [PATCH 01/11] scan all nodes idle resource --- go/autoscaler/autoscaler.go | 25 +++++++++++++++++++++++++ go/controller/cluster.go | 30 ++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/go/autoscaler/autoscaler.go b/go/autoscaler/autoscaler.go index 10833724..23fdb7fe 100644 --- a/go/autoscaler/autoscaler.go +++ b/go/autoscaler/autoscaler.go @@ -46,6 +46,9 @@ type ClusterResource struct { MemoryRequestMega int64 MemoryLimitMega int64 MemoryTotalMega int64 + + NodesCPUIdleMilli map[string]int64 + NodesMemoryIdleMega map[string]int64 } // Cluster represents the cluster managment system such as Kubernetes. @@ -267,6 +270,28 @@ func scaleDryRun(r *ClusterResource, j job, curDiff int, scaleDown bool) (additi return } + assignable := false + + for name, idle := range r.NodesCPUIdleMilli { + log.Debug("CPU idle: ", idle, ", name: ", name) + if cpuRequestMilli <= idle { + assignable = true + break + } + } + + for _, idle := range r.NodesMemoryIdleMega { + if memRequestMega <= idle { + assignable = true + break + } + } + + if assignable == false { + additional = 0 + return + } + if r.CPUTotalMilli-r.CPURequestMilli >= cpuRequestMilli { additionalCPUInstance = 1 } diff --git a/go/controller/cluster.go b/go/controller/cluster.go index 3f28c175..5b479602 100644 --- a/go/controller/cluster.go +++ b/go/controller/cluster.go @@ -95,6 +95,21 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l return } +func asyncNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryIdleMega map[string]int64) (err error) { + for _, pod := range podList.Items { + for _, container := range pod.Spec.Containers { + nodesCPUIdleMilli[pod.Spec.NodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) + nodesMemoryIdleMega[pod.Spec.NodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) + } + + for _, container := range pod.Spec.InitContainers { + nodesCPUIdleMilli[pod.Spec.NodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) + nodesMemoryIdleMega[pod.Spec.NodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) + } + } + return +} + // SyncResource will update free and total resources in k8s cluster. func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { nodes := c.clientset.CoreV1().Nodes() @@ -103,7 +118,14 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { return autoscaler.ClusterResource{}, err } allocatable := make(v1.ResourceList) + nodesCPUIdleMilli := make(map[string]int64) + nodesMemoryIdleMega := make(map[string]int64) + for _, node := range nodeList.Items { + nodesCPUIdleMilli[node.GetObjectMeta().GetName()] = + node.Status.Allocatable.Cpu().ScaledValue(resource.Milli) + nodesMemoryIdleMega[node.GetObjectMeta().GetName()] = + node.Status.Allocatable.Memory().ScaledValue(resource.Mega) AddResourceList(allocatable, node.Status.Allocatable) } @@ -130,6 +152,11 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { return autoscaler.ClusterResource{}, err } + err = asyncNodesIdleResource(allPodsList, nodesCPUIdleMilli, nodesMemoryIdleMega) + if err != nil { + return autoscaler.ClusterResource{}, err + } + res = autoscaler.ClusterResource{ NodeCount: len(nodeList.Items), GPUTotal: int(allocatable.NvidiaGPU().Value()), @@ -143,6 +170,9 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { GPULimit: int(allLimits.NvidiaGPU().Value()), CPULimitMilli: allLimits.Cpu().ScaledValue(resource.Milli), MemoryLimitMega: allLimits.Memory().ScaledValue(resource.Mega), + + NodesCPUIdleMilli: nodesCPUIdleMilli, + nodesMemoryIdleMega: nodesMemoryIdleMega, } return From 8b8291d922a8510a119befaa88559c05f34a20fe Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 2 Nov 2017 18:48:49 +0800 Subject: [PATCH 02/11] fix scale up with no assignal node --- go/autoscaler/autoscaler.go | 7 +++---- go/controller/cluster.go | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/go/autoscaler/autoscaler.go b/go/autoscaler/autoscaler.go index 3033b240..63bfd1a2 100644 --- a/go/autoscaler/autoscaler.go +++ b/go/autoscaler/autoscaler.go @@ -226,17 +226,15 @@ nextJob: } func searchAssignableNodeByCPU(r *ClusterResource, j job) (assignable bool) { - log.Debug("Search assignale node...") assignable = false - for name, idle := range r.NodesCPUIdleMilli { - log.Debug("CPU idle: ", idle, ", name: ", name) - + for _, idle := range r.NodesCPUIdleMilli { if j.TrainerCPURequestMilli() <= idle { assignable = true return } } + log.Debug("No node is assignable, job CPU is ", j.TrainerMemRequestMega()) return } @@ -249,6 +247,7 @@ func searchAssignableNodeByMem(r *ClusterResource, j job) (assignable bool) { return } } + log.Debug("No node is assignable, job memory is ", j.TrainerMemRequestMega()) return } diff --git a/go/controller/cluster.go b/go/controller/cluster.go index bcf72fa3..ec739826 100644 --- a/go/controller/cluster.go +++ b/go/controller/cluster.go @@ -172,8 +172,7 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { MemoryLimitMega: allLimits.Memory().ScaledValue(resource.Mega), NodesCPUIdleMilli: nodesCPUIdleMilli, - nodesMemoryIdleMega: nodesMemoryIdleMega, + NodesMemoryIdleMega: nodesMemoryIdleMega, } - return } From ae12903a6e54c7f33a7e640363545d406e4bdaff Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 2 Nov 2017 18:50:57 +0800 Subject: [PATCH 03/11] update --- go/autoscaler/autoscaler.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/go/autoscaler/autoscaler.go b/go/autoscaler/autoscaler.go index 63bfd1a2..2d152944 100644 --- a/go/autoscaler/autoscaler.go +++ b/go/autoscaler/autoscaler.go @@ -227,7 +227,6 @@ nextJob: func searchAssignableNodeByCPU(r *ClusterResource, j job) (assignable bool) { assignable = false - for _, idle := range r.NodesCPUIdleMilli { if j.TrainerCPURequestMilli() <= idle { assignable = true @@ -240,7 +239,6 @@ func searchAssignableNodeByCPU(r *ClusterResource, j job) (assignable bool) { func searchAssignableNodeByMem(r *ClusterResource, j job) (assignable bool) { assignable = false - for _, idle := range r.NodesMemoryIdleMega { if j.TrainerMemRequestMega() <= idle { assignable = true From 36697991b5d3ae2a078dab528ae02e916907dcd6 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 2 Nov 2017 18:58:33 +0800 Subject: [PATCH 04/11] update --- go/controller/cluster.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/go/controller/cluster.go b/go/controller/cluster.go index ec739826..e7bbfe3c 100644 --- a/go/controller/cluster.go +++ b/go/controller/cluster.go @@ -97,14 +97,19 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l func syncNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryIdleMega map[string]int64) (err error) { for _, pod := range podList.Items { + nodeName := pod.Spec.NodeName + // if nodeName is empty, the pod is not assigned. + if nodeName == "" { + continue + } for _, container := range pod.Spec.Containers { - nodesCPUIdleMilli[pod.Spec.NodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) - nodesMemoryIdleMega[pod.Spec.NodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) + nodesCPUIdleMilli[nodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) + nodesMemoryIdleMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) } for _, container := range pod.Spec.InitContainers { - nodesCPUIdleMilli[pod.Spec.NodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) - nodesMemoryIdleMega[pod.Spec.NodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) + nodesCPUIdleMilli[nodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) + nodesMemoryIdleMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) } } return From 65b694445cc82884f549d749723fab8cababe62d Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 2 Nov 2017 21:19:29 +0800 Subject: [PATCH 05/11] update --- go/autoscaler/autoscaler.go | 17 +++++++---------- go/controller/cluster.go | 3 +-- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/go/autoscaler/autoscaler.go b/go/autoscaler/autoscaler.go index 2d152944..38861d9b 100644 --- a/go/autoscaler/autoscaler.go +++ b/go/autoscaler/autoscaler.go @@ -48,7 +48,7 @@ type ClusterResource struct { MemoryTotalMega int64 NodesCPUIdleMilli map[string]int64 - NodesMemoryIdleMega map[string]int64 + NodesMemoryFreeMega map[string]int64 } // Cluster represents the cluster managment system such as Kubernetes. @@ -226,27 +226,23 @@ nextJob: } func searchAssignableNodeByCPU(r *ClusterResource, j job) (assignable bool) { - assignable = false for _, idle := range r.NodesCPUIdleMilli { if j.TrainerCPURequestMilli() <= idle { - assignable = true - return + return true } } log.Debug("No node is assignable, job CPU is ", j.TrainerMemRequestMega()) - return + return false } func searchAssignableNodeByMem(r *ClusterResource, j job) (assignable bool) { - assignable = false - for _, idle := range r.NodesMemoryIdleMega { + for _, idle := range r.NodesMemoryFreeMega { if j.TrainerMemRequestMega() <= idle { - assignable = true - return + return true } } log.Debug("No node is assignable, job memory is ", j.TrainerMemRequestMega()) - return + return false } func scaleDryRun(r *ClusterResource, j job, curDiff int, maxLoadDesired float64, scaleDown bool) (additional int) { @@ -312,6 +308,7 @@ func scaleDryRun(r *ClusterResource, j job, curDiff int, maxLoadDesired float64, if !searchAssignableNodeByCPU(r, j) || !searchAssignableNodeByMem(r, j) { // can not find assignable node, do not scale additional = 0 + return } // NOTE: do not scale up to use full cluster resource of CPU diff --git a/go/controller/cluster.go b/go/controller/cluster.go index e7bbfe3c..da39f954 100644 --- a/go/controller/cluster.go +++ b/go/controller/cluster.go @@ -98,7 +98,6 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l func syncNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryIdleMega map[string]int64) (err error) { for _, pod := range podList.Items { nodeName := pod.Spec.NodeName - // if nodeName is empty, the pod is not assigned. if nodeName == "" { continue } @@ -177,7 +176,7 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { MemoryLimitMega: allLimits.Memory().ScaledValue(resource.Mega), NodesCPUIdleMilli: nodesCPUIdleMilli, - NodesMemoryIdleMega: nodesMemoryIdleMega, + NodesMemoryFreeMega: nodesMemoryFreeMega, } return } From 15082e04e607453ce5effd80b233d43f1167c8c4 Mon Sep 17 00:00:00 2001 From: Yancey1989 Date: Thu, 2 Nov 2017 21:26:07 +0800 Subject: [PATCH 06/11] update --- go/controller/cluster.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/controller/cluster.go b/go/controller/cluster.go index da39f954..c0214f6f 100644 --- a/go/controller/cluster.go +++ b/go/controller/cluster.go @@ -95,7 +95,7 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l return } -func syncNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryIdleMega map[string]int64) (err error) { +func syncNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryFreeMega map[string]int64) (err error) { for _, pod := range podList.Items { nodeName := pod.Spec.NodeName if nodeName == "" { @@ -103,12 +103,12 @@ func syncNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int } for _, container := range pod.Spec.Containers { nodesCPUIdleMilli[nodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) - nodesMemoryIdleMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) + nodesMemoryFreeMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) } for _, container := range pod.Spec.InitContainers { nodesCPUIdleMilli[nodeName] -= container.Resources.Requests.Cpu().ScaledValue(resource.Milli) - nodesMemoryIdleMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) + nodesMemoryFreeMega[nodeName] -= container.Resources.Requests.Memory().ScaledValue(resource.Mega) } } return @@ -123,12 +123,12 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { } allocatable := make(v1.ResourceList) nodesCPUIdleMilli := make(map[string]int64) - nodesMemoryIdleMega := make(map[string]int64) + nodesMemoryFreeMega := make(map[string]int64) for _, node := range nodeList.Items { nodesCPUIdleMilli[node.GetObjectMeta().GetName()] = node.Status.Allocatable.Cpu().ScaledValue(resource.Milli) - nodesMemoryIdleMega[node.GetObjectMeta().GetName()] = + nodesMemoryFreeMega[node.GetObjectMeta().GetName()] = node.Status.Allocatable.Memory().ScaledValue(resource.Mega) AddResourceList(allocatable, node.Status.Allocatable) } @@ -156,7 +156,7 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { return autoscaler.ClusterResource{}, err } - err = syncNodesIdleResource(allPodsList, nodesCPUIdleMilli, nodesMemoryIdleMega) + err = syncNodesIdleResource(allPodsList, nodesCPUIdleMilli, nodesMemoryFreeMega) if err != nil { return autoscaler.ClusterResource{}, err } From 2529a3eed1aa999a214d3ba84fd12dc5e49d473c Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 2 Nov 2017 21:23:48 +0000 Subject: [PATCH 07/11] Print less log, fix unit test --- go/autoscaler/autoscaler.go | 14 ++++++++++++-- go/autoscaler/autoscaler_internal_test.go | 18 ++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/go/autoscaler/autoscaler.go b/go/autoscaler/autoscaler.go index 38861d9b..24a6b99b 100644 --- a/go/autoscaler/autoscaler.go +++ b/go/autoscaler/autoscaler.go @@ -47,10 +47,20 @@ type ClusterResource struct { MemoryLimitMega int64 MemoryTotalMega int64 + NodeInfos NodeInfos +} + +// NodeInfos is the information of all nodes. +type NodeInfos struct { NodesCPUIdleMilli map[string]int64 NodesMemoryFreeMega map[string]int64 } +// String is the string that represents NodeInfo when printed. +func (n NodeInfos) String() string { + return fmt.Sprintf("NodeInfo(%d nodes)", len(n.NodesCPUIdleMilli)) +} + // Cluster represents the cluster managment system such as Kubernetes. type Cluster interface { // SyncResource will sync resource values with the cluster. @@ -226,7 +236,7 @@ nextJob: } func searchAssignableNodeByCPU(r *ClusterResource, j job) (assignable bool) { - for _, idle := range r.NodesCPUIdleMilli { + for _, idle := range r.NodeInfos.NodesCPUIdleMilli { if j.TrainerCPURequestMilli() <= idle { return true } @@ -236,7 +246,7 @@ func searchAssignableNodeByCPU(r *ClusterResource, j job) (assignable bool) { } func searchAssignableNodeByMem(r *ClusterResource, j job) (assignable bool) { - for _, idle := range r.NodesMemoryFreeMega { + for _, idle := range r.NodeInfos.NodesMemoryFreeMega { if j.TrainerMemRequestMega() <= idle { return true } diff --git a/go/autoscaler/autoscaler_internal_test.go b/go/autoscaler/autoscaler_internal_test.go index d29f48e5..a340cae7 100644 --- a/go/autoscaler/autoscaler_internal_test.go +++ b/go/autoscaler/autoscaler_internal_test.go @@ -105,6 +105,11 @@ func TestScaleDryRunSatisfied(t *testing.T) { assert.Equal(t, 0, scaleDryRun(&r, j, 0, 1.0, false)) } +var allIdleNodeInfos = NodeInfos{ + NodesCPUIdleMilli: map[string]int64{"": 99999}, + NodesMemoryFreeMega: map[string]int64{"": 99999}, +} + func TestScaleDryRunMoreCPU(t *testing.T) { r := ClusterResource{ CPULimitMilli: 100, @@ -113,6 +118,7 @@ func TestScaleDryRunMoreCPU(t *testing.T) { MemoryRequestMega: 100, MemoryLimitMega: 100, MemoryTotalMega: 1000, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) assert.Equal(t, 1, scaleDryRun(&r, j, 0, 1.0, false)) @@ -126,6 +132,7 @@ func TestScaleDryRunNoMoreCPU(t *testing.T) { MemoryRequestMega: 100, MemoryLimitMega: 100, MemoryTotalMega: 1000, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) @@ -143,6 +150,7 @@ func TestScaleDryRunMoreGPU(t *testing.T) { GPULimit: 0, GPURequest: 0, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "1", 1, 3, 1) assert.Equal(t, 1, scaleDryRun(&r, j, 0, 1.0, false)) @@ -160,6 +168,7 @@ func TestScaleDryRunNoMoreGPU(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "1", 1, 3, 1) @@ -197,6 +206,7 @@ func TestScaleDryRunScaleDownToMin(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "0", 1, 3, 3) @@ -216,6 +226,7 @@ func TestScaleDryRunScaleDownFullCluster(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "10Mi", "10Mi", "0", 1, 3, 3) @@ -234,6 +245,7 @@ func TestScaleDryRunNoMem(t *testing.T) { GPULimit: 10, GPURequest: 10, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) @@ -247,6 +259,7 @@ func TestScaleAllDryRunNoMem(t *testing.T) { MemoryLimitMega: 1000, MemoryTotalMega: 1000, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1) @@ -265,6 +278,7 @@ func TestScaleAllDryRun(t *testing.T) { GPULimit: 8, GPURequest: 8, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) @@ -283,6 +297,7 @@ func TestScaleAllDryRunNotFull(t *testing.T) { GPULimit: 0, GPURequest: 0, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 1) @@ -301,6 +316,7 @@ func TestScaleAllDryRunDownNotFull(t *testing.T) { GPULimit: 0, GPURequest: 0, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "100Mi", "100Mi", "0", 1, 3, 3) @@ -319,6 +335,7 @@ func TestScaleAllDryRunLessCPU(t *testing.T) { GPULimit: 8, GPURequest: 8, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1) @@ -337,6 +354,7 @@ func TestScaleAllDryRunLessGPU(t *testing.T) { GPULimit: 9, GPURequest: 9, GPUTotal: 10, + NodeInfos: allIdleNodeInfos, } j := makeJob("name", "1", "1", "1", "1", "1", 1, 3, 1) From c6545e1127fca2392b4a8ca39335455a5a3031cc Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 2 Nov 2017 21:33:10 +0000 Subject: [PATCH 08/11] fix build --- go/controller/cluster.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/go/controller/cluster.go b/go/controller/cluster.go index c0214f6f..a05d6aba 100644 --- a/go/controller/cluster.go +++ b/go/controller/cluster.go @@ -175,8 +175,10 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { CPULimitMilli: allLimits.Cpu().ScaledValue(resource.Milli), MemoryLimitMega: allLimits.Memory().ScaledValue(resource.Mega), - NodesCPUIdleMilli: nodesCPUIdleMilli, - NodesMemoryFreeMega: nodesMemoryFreeMega, + NodeInfos: autoscaler.NodeInfos{ + NodesCPUIdleMilli: nodesCPUIdleMilli, + NodesMemoryFreeMega: nodesMemoryFreeMega, + }, } return } From c4c74f8ef72c459202f317caf5d5e4b0c559faf1 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 2 Nov 2017 22:28:53 +0000 Subject: [PATCH 09/11] Fix crash, add logs --- go/autoscaler/autoscaler.go | 8 +++++++- go/controller/cluster.go | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/go/autoscaler/autoscaler.go b/go/autoscaler/autoscaler.go index 24a6b99b..7c8c2271 100644 --- a/go/autoscaler/autoscaler.go +++ b/go/autoscaler/autoscaler.go @@ -56,7 +56,7 @@ type NodeInfos struct { NodesMemoryFreeMega map[string]int64 } -// String is the string that represents NodeInfo when printed. +// String returns the string that represents NodeInfo when printed. func (n NodeInfos) String() string { return fmt.Sprintf("NodeInfo(%d nodes)", len(n.NodesCPUIdleMilli)) } @@ -391,6 +391,11 @@ func scaleAllDryRun(jobs []job, r ClusterResource, maxLoadDesired float64) map[s func (a *Autoscaler) scaleAll(diff map[string]int) { for name := range diff { if diff[name] != 0 { + if a.jobs[name].TrainerJob == nil { + log.Info("Trainer job not found yet, will skip scaling", "name", name) + continue + } + log.Info("scaling job", "name", name, "number of instances", diff[name]) target := *a.jobs[name].TrainerJob.Spec.Parallelism + int32(diff[name]) @@ -483,6 +488,7 @@ func (a *Autoscaler) Monitor() { log.Error("error sync resource", "error", err) continue } + log.Info("sync cluster resource done", "resource", r) var js []job for _, j := range a.jobs { diff --git a/go/controller/cluster.go b/go/controller/cluster.go index a05d6aba..415d18d3 100644 --- a/go/controller/cluster.go +++ b/go/controller/cluster.go @@ -95,7 +95,7 @@ func getPodsTotalRequestsAndLimits(podList *v1.PodList) (reqs v1.ResourceList, l return } -func syncNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryFreeMega map[string]int64) (err error) { +func updateNodesIdleResource(podList *v1.PodList, nodesCPUIdleMilli map[string]int64, nodesMemoryFreeMega map[string]int64) (err error) { for _, pod := range podList.Items { nodeName := pod.Spec.NodeName if nodeName == "" { @@ -156,7 +156,7 @@ func (c *Cluster) SyncResource() (res autoscaler.ClusterResource, err error) { return autoscaler.ClusterResource{}, err } - err = syncNodesIdleResource(allPodsList, nodesCPUIdleMilli, nodesMemoryFreeMega) + err = updateNodesIdleResource(allPodsList, nodesCPUIdleMilli, nodesMemoryFreeMega) if err != nil { return autoscaler.ClusterResource{}, err } From 692e89df87a14505ba23012a9e18c2d5e69a1664 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 2 Nov 2017 22:39:51 +0000 Subject: [PATCH 10/11] Scale all by target rather than by diff, fix TrainerJob maybe nil --- go/autoscaler/autoscaler.go | 84 ++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 42 deletions(-) diff --git a/go/autoscaler/autoscaler.go b/go/autoscaler/autoscaler.go index 7c8c2271..41885bab 100644 --- a/go/autoscaler/autoscaler.go +++ b/go/autoscaler/autoscaler.go @@ -388,48 +388,41 @@ func scaleAllDryRun(jobs []job, r ClusterResource, maxLoadDesired float64) map[s return diff } -func (a *Autoscaler) scaleAll(diff map[string]int) { - for name := range diff { - if diff[name] != 0 { - if a.jobs[name].TrainerJob == nil { - log.Info("Trainer job not found yet, will skip scaling", "name", name) +func (a *Autoscaler) scaleAll(target map[string]int) { + for name := range target { + log.Info("scaling job", + "name", name, "target", target[name]) + target := int32(target[name]) + + var err error + for retry := 5; retry > 0; retry-- { + var tj *batchv1.Job + // don't shadow err + tj, err = a.cluster.GetTrainerJob(a.jobs[name].Config) + if err != nil { + log.Warn("sync trainer job error", + "error", err, "remaining retry", retry) continue } - - log.Info("scaling job", - "name", name, "number of instances", diff[name]) - target := *a.jobs[name].TrainerJob.Spec.Parallelism + int32(diff[name]) - - var err error - for retry := 5; retry > 0; retry-- { - var tj *batchv1.Job - // don't shadow err - tj, err = a.cluster.GetTrainerJob(a.jobs[name].Config) - if err != nil { - log.Warn("sync trainer job error", - "error", err, "remaining retry", retry) - continue - } - j := a.jobs[name] - // NOTE: only update batchv1.job from k8s api-server before updating - // for efficiency. Update the job resource to get latest k8s - // resource reversion. - j.TrainerJob = tj - a.jobs[name] = j - *a.jobs[name].TrainerJob.Spec.Parallelism = target - err = a.cluster.UpdateTrainerJob(a.jobs[name].TrainerJob) - if err != nil { - log.Error("error updating trainer job", - "error", err, "remaining retry", retry) - continue - } - - break - } - + j := a.jobs[name] + // NOTE: only update batchv1.job from k8s api-server before updating + // for efficiency. Update the job resource to get latest k8s + // resource reversion. + j.TrainerJob = tj + a.jobs[name] = j + *a.jobs[name].TrainerJob.Spec.Parallelism = target + err = a.cluster.UpdateTrainerJob(a.jobs[name].TrainerJob) if err != nil { - log.Warn("Error updating trainer job", "error", err) + log.Error("error updating trainer job", + "error", err, "remaining retry", retry) + continue } + + break + } + + if err != nil { + log.Warn("Error updating trainer job", "error", err) } } } @@ -491,7 +484,7 @@ func (a *Autoscaler) Monitor() { log.Info("sync cluster resource done", "resource", r) var js []job - for _, j := range a.jobs { + for key, j := range a.jobs { // k8s job for trainers may not be created immediently // try sync it here if j.TrainerJob == nil { @@ -505,7 +498,9 @@ func (a *Autoscaler) Monitor() { continue } j.TrainerJob = tj + a.jobs[key] = j } + // Scale jobs only when all pods' "Phase" are running. // Pending/starting/terminating jobs are ignored. total, running, err := a.cluster.JobPods(j.Config) @@ -518,10 +513,15 @@ func (a *Autoscaler) Monitor() { } } diff := scaleAllDryRun(js, r, a.maxLoadDesired) - if len(diff) > 0 { - log.Info("calculated scaling plan", "plan", diff, + target := make(map[string]int) + for k, v := range diff { + target[k] = int(*a.jobs[k].TrainerJob.Spec.Parallelism) + v + } + + if len(target) > 0 { + log.Info("calculated scaling plan", "target", target, "clusterResource", r) } - a.scaleAll(diff) + a.scaleAll(target) } } From ff620db63aa64dbba40a2e4adfc29e638130e55f Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Fri, 3 Nov 2017 00:05:08 +0000 Subject: [PATCH 11/11] Increase max_load_desired from 0.9 to 0.97 --- go/cmd/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/cmd/controller/controller.go b/go/cmd/controller/controller.go index 936085de..3f566a3f 100644 --- a/go/cmd/controller/controller.go +++ b/go/cmd/controller/controller.go @@ -17,7 +17,7 @@ import ( func main() { kubeconfig := flag.String("kubeconfig", "", "Path to a kube config. Only required if out-of-cluster.") logLevel := flag.String("log_level", "info", "Log level can be debug, info, warn, error, crit.") - maxLoadDesired := flag.Float64("max_load_desired", 0.9, `Keep the cluster max resource usage around + maxLoadDesired := flag.Float64("max_load_desired", 0.97, `Keep the cluster max resource usage around this value, jobs will scale down if total request is over this level.`) flag.Parse()