From a90aa2b3766d2a1a3bba96ae385b84a06024d713 Mon Sep 17 00:00:00 2001 From: Xuzheng Chang Date: Wed, 26 Jul 2023 15:46:14 +0800 Subject: [PATCH] remove node out of sync state --- pkg/scheduler/actions/allocate/allocate.go | 4 +-- pkg/scheduler/api/node_info.go | 42 +++++++++++----------- pkg/scheduler/api/resource_info.go | 10 +++--- pkg/scheduler/api/resource_info_test.go | 4 +-- pkg/scheduler/api/unschedule_info.go | 4 +++ pkg/scheduler/cache/event_handlers.go | 6 ---- 6 files changed, 34 insertions(+), 36 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index 1ad5a470a3f..9e9dd42fffe 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -99,8 +99,8 @@ func (alloc *Action) Execute(ssn *framework.Session) { allNodes := ssn.NodeList predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) { // Check for Resource Predicate - if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok { - return nil, api.NewFitError(task, node, reason) + if ok, resource := task.InitResreq.LessEqualWithResource(node.FutureIdle(), api.Zero); !ok { + return nil, api.NewFitError(task, node, api.WrapFitErrorReason(resource)) } var statusSets util.StatusSets statusSets, err := ssn.PredicateFn(task, node) diff --git a/pkg/scheduler/api/node_info.go b/pkg/scheduler/api/node_info.go index ae1585415a5..5af0bcc0b73 100644 --- a/pkg/scheduler/api/node_info.go +++ b/pkg/scheduler/api/node_info.go @@ -291,12 +291,8 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) { } // set NodeState according to resources - if !ni.Used.LessEqual(ni.Allocatable, Zero) { - ni.State = NodeState{ - Phase: NotReady, - Reason: "OutOfSync", - } - return + if ok, resource := ni.Used.LessEqualWithResource(ni.Allocatable, Zero); !ok { + klog.ErrorS(nil, "Node out of sync", "name", ni.Name, "resource", resource) } // If node not ready, e.g. power off @@ -372,30 +368,38 @@ func (ni *NodeInfo) setNode(node *v1.Node) { for _, ti := range ni.Tasks { switch ti.Status { case Releasing: - ni.Idle.sub(ti.Resreq) // sub without assertion + ni.allocateIdleResource(ti) ni.Releasing.Add(ti.Resreq) ni.Used.Add(ti.Resreq) ni.addResource(ti.Pod) case Pipelined: ni.Pipelined.Add(ti.Resreq) default: - ni.Idle.sub(ti.Resreq) // sub without assertion + ni.allocateIdleResource(ti) ni.Used.Add(ti.Resreq) ni.addResource(ti.Pod) } } } -func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) error { - if ti.Resreq.LessEqual(ni.Idle, Zero) { +func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) { + ok, resource := ti.Resreq.LessEqualWithResource(ni.Idle, Zero) + if ok { ni.Idle.Sub(ti.Resreq) - return nil + return } - return &AllocateFailError{Reason: fmt.Sprintf( - "cannot allocate resource, <%s> idle: %s <%s/%s> req: %s", - ni.Name, ni.Idle.String(), ti.Namespace, ti.Name, ti.Resreq.String(), - )} + klog.ErrorS(nil, "Cannot allocate resource, set idle resource to zero", + "nodeName", ni.Name, "task", klog.KObj(ti.Pod), "resource", resource, "idle", ni.Idle.String(), "req", ti.Resreq.String()) + // set idle resource zero to prevent pod schedule on current node. + switch resource { + case "cpu": + ni.Idle.MilliCPU = 0 + case "memory": + ni.Idle.Memory = 0 + default: + ni.Idle.ScalarResources[v1.ResourceName(resource)] = 0 + } } // AddTask is used to add a task in nodeInfo object @@ -420,18 +424,14 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error { if ni.Node != nil { switch ti.Status { case Releasing: - if err := ni.allocateIdleResource(ti); err != nil { - return err - } + ni.allocateIdleResource(ti) ni.Releasing.Add(ti.Resreq) ni.Used.Add(ti.Resreq) ni.addResource(ti.Pod) case Pipelined: ni.Pipelined.Add(ti.Resreq) default: - if err := ni.allocateIdleResource(ti); err != nil { - return err - } + ni.allocateIdleResource(ti) ni.Used.Add(ti.Resreq) ni.addResource(ti.Pod) } diff --git a/pkg/scheduler/api/resource_info.go b/pkg/scheduler/api/resource_info.go index b42f45fd11a..c77409d1de2 100644 --- a/pkg/scheduler/api/resource_info.go +++ b/pkg/scheduler/api/resource_info.go @@ -405,11 +405,11 @@ func (r *Resource) LessEqual(rr *Resource, defaultValue DimensionDefaultValue) b return true } -// LessEqualWithReason returns true, "" only on condition that all dimensions of resources in r are less than or equal with that of rr, +// LessEqualWithResource returns true, "" only on condition that all dimensions of resources in r are less than or equal with that of rr, // Otherwise returns false and err string ,which show which resource is insufficient. // @param defaultValue "default value for resource dimension not defined in ScalarResources. Its value can only be one of 'Zero' and 'Infinity'" // this function is the same as LessEqual , and it will be merged to LessEqual in the future -func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefaultValue) (bool, string) { +func (r *Resource) LessEqualWithResource(rr *Resource, defaultValue DimensionDefaultValue) (bool, string) { lessEqualFunc := func(l, r, diff float64) bool { if l < r || math.Abs(l-r) < diff { return true @@ -418,10 +418,10 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau } if !lessEqualFunc(r.MilliCPU, rr.MilliCPU, minResource) { - return false, "Insufficient cpu" + return false, "cpu" } if !lessEqualFunc(r.Memory, rr.Memory, minResource) { - return false, "Insufficient memory" + return false, "memory" } for resourceName, leftValue := range r.ScalarResources { @@ -431,7 +431,7 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau } if !lessEqualFunc(leftValue, rightValue, minResource) { - return false, "Insufficient " + string(resourceName) + return false, string(resourceName) } } return true, "" diff --git a/pkg/scheduler/api/resource_info_test.go b/pkg/scheduler/api/resource_info_test.go index f4f42d9dd5a..e43aac1dd62 100644 --- a/pkg/scheduler/api/resource_info_test.go +++ b/pkg/scheduler/api/resource_info_test.go @@ -1379,13 +1379,13 @@ func TestResource_LessEqualResource(t *testing.T) { } for _, test := range testsForDefaultZero { - _, reason := test.resource1.LessEqualWithReason(test.resource2, Zero) + _, reason := test.resource1.LessEqualWithResource(test.resource2, Zero) if !reflect.DeepEqual(test.expected, reason) { t.Errorf("expected: %#v, got: %#v", test.expected, reason) } } for caseID, test := range testsForDefaultInfinity { - _, reason := test.resource1.LessEqualWithReason(test.resource2, Infinity) + _, reason := test.resource1.LessEqualWithResource(test.resource2, Infinity) if !reflect.DeepEqual(test.expected, reason) { t.Errorf("caseID %d expected: %#v, got: %#v", caseID, test.expected, reason) } diff --git a/pkg/scheduler/api/unschedule_info.go b/pkg/scheduler/api/unschedule_info.go index e1c6310ac38..24dd9072eb0 100644 --- a/pkg/scheduler/api/unschedule_info.go +++ b/pkg/scheduler/api/unschedule_info.go @@ -114,3 +114,7 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError { func (f *FitError) Error() string { return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", ")) } + +func WrapFitErrorReason(resource string) string { + return "Insufficient "+resource +} \ No newline at end of file diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index 29015b03311..5eaf070e5df 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -76,12 +76,6 @@ func (sc *SchedulerCache) addTask(pi *schedulingapi.TaskInfo) error { node := sc.Nodes[pi.NodeName] if !isTerminated(pi.Status) { if err := node.AddTask(pi); err != nil { - if _, outOfSync := err.(*schedulingapi.AllocateFailError); outOfSync { - node.State = schedulingapi.NodeState{ - Phase: schedulingapi.NotReady, - Reason: "OutOfSync", - } - } return err } } else {