Skip to content

Commit

Permalink
remove node out of sync state
Browse files Browse the repository at this point in the history
Signed-off-by: Xuzheng Chang <changxuzheng@huawei.com>
  • Loading branch information
Monokaix committed Jul 27, 2023
1 parent 03b5da4 commit cbe8d0a
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 40 deletions.
4 changes: 2 additions & 2 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func (alloc *Action) Execute(ssn *framework.Session) {
allNodes := ssn.NodeList
predicateFn := func(task *api.TaskInfo, node *api.NodeInfo) ([]*api.Status, error) {
// Check for Resource Predicate
if ok, reason := task.InitResreq.LessEqualWithReason(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, reason)
if ok, resource := task.InitResreq.LessEqualWithResource(node.FutureIdle(), api.Zero); !ok {
return nil, api.NewFitError(task, node, api.WrapInsufficientResourceReason(resource))
}
var statusSets util.StatusSets
statusSets, err := ssn.PredicateFn(task, node)
Expand Down
42 changes: 21 additions & 21 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,8 @@ func (ni *NodeInfo) setNodeState(node *v1.Node) {
}

// set NodeState according to resources
if !ni.Used.LessEqual(ni.Allocatable, Zero) {
ni.State = NodeState{
Phase: NotReady,
Reason: "OutOfSync",
}
return
if ok, resource := ni.Used.LessEqualWithResource(ni.Allocatable, Zero); !ok {
klog.ErrorS(nil, "Node out of sync", "name", ni.Name, "resource", resource)
}

// If node not ready, e.g. power off
Expand Down Expand Up @@ -372,30 +368,38 @@ func (ni *NodeInfo) setNode(node *v1.Node) {
for _, ti := range ni.Tasks {
switch ti.Status {
case Releasing:
ni.Idle.sub(ti.Resreq) // sub without assertion
ni.allocateIdleResource(ti)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
ni.Idle.sub(ti.Resreq) // sub without assertion
ni.allocateIdleResource(ti)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
}
}
}

func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) error {
if ti.Resreq.LessEqual(ni.Idle, Zero) {
func (ni *NodeInfo) allocateIdleResource(ti *TaskInfo) {
ok, resource := ti.Resreq.LessEqualWithResource(ni.Idle, Zero)
if ok {
ni.Idle.Sub(ti.Resreq)
return nil
return
}

return &AllocateFailError{Reason: fmt.Sprintf(
"cannot allocate resource, <%s> idle: %s <%s/%s> req: %s",
ni.Name, ni.Idle.String(), ti.Namespace, ti.Name, ti.Resreq.String(),
)}
klog.ErrorS(nil, "Cannot allocate resource, set idle resource to zero",
"nodeName", ni.Name, "task", klog.KObj(ti.Pod), "resource", resource, "idle", ni.Idle.String(), "req", ti.Resreq.String())
// set idle resource zero to prevent pod schedule on current node.
switch resource {
case "cpu":
ni.Idle.MilliCPU = 0
case "memory":
ni.Idle.Memory = 0
default:
ni.Idle.ScalarResources[v1.ResourceName(resource)] = 0
}
}

// AddTask is used to add a task in nodeInfo object
Expand All @@ -420,18 +424,14 @@ func (ni *NodeInfo) AddTask(task *TaskInfo) error {
if ni.Node != nil {
switch ti.Status {
case Releasing:
if err := ni.allocateIdleResource(ti); err != nil {
return err
}
ni.allocateIdleResource(ti)
ni.Releasing.Add(ti.Resreq)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
case Pipelined:
ni.Pipelined.Add(ti.Resreq)
default:
if err := ni.allocateIdleResource(ti); err != nil {
return err
}
ni.allocateIdleResource(ti)
ni.Used.Add(ti.Resreq)
ni.addResource(ti.Pod)
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/scheduler/api/resource_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,11 @@ func (r *Resource) LessEqual(rr *Resource, defaultValue DimensionDefaultValue) b
return true
}

// LessEqualWithReason returns true, "" only on condition that all dimensions of resources in r are less than or equal with that of rr,
// LessEqualWithResource returns true, "" only on condition that all dimensions of resources in r are less than or equal with that of rr,
// Otherwise returns false and err string ,which show which resource is insufficient.
// @param defaultValue "default value for resource dimension not defined in ScalarResources. Its value can only be one of 'Zero' and 'Infinity'"
// this function is the same as LessEqual , and it will be merged to LessEqual in the future
func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefaultValue) (bool, string) {
func (r *Resource) LessEqualWithResource(rr *Resource, defaultValue DimensionDefaultValue) (bool, string) {
lessEqualFunc := func(l, r, diff float64) bool {
if l < r || math.Abs(l-r) < diff {
return true
Expand All @@ -418,10 +418,10 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau
}

if !lessEqualFunc(r.MilliCPU, rr.MilliCPU, minResource) {
return false, "Insufficient cpu"
return false, "cpu"
}
if !lessEqualFunc(r.Memory, rr.Memory, minResource) {
return false, "Insufficient memory"
return false, "memory"
}

for resourceName, leftValue := range r.ScalarResources {
Expand All @@ -431,7 +431,7 @@ func (r *Resource) LessEqualWithReason(rr *Resource, defaultValue DimensionDefau
}

if !lessEqualFunc(leftValue, rightValue, minResource) {
return false, "Insufficient " + string(resourceName)
return false, string(resourceName)
}
}
return true, ""
Expand Down
12 changes: 6 additions & 6 deletions pkg/scheduler/api/resource_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1279,7 +1279,7 @@ func TestResource_LessEqualResource(t *testing.T) {
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
resource2: &Resource{},
expected: "Insufficient cpu",
expected: "cpu",
},
{
resource1: &Resource{
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "Insufficient scalar.test/scalar1",
expected: "scalar.test/scalar1",
},
{
resource1: &Resource{
Expand All @@ -1344,7 +1344,7 @@ func TestResource_LessEqualResource(t *testing.T) {
Memory: 8000,
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 4000, "hugepages-test": 5000},
},
expected: "Insufficient cpu",
expected: "cpu",
},
}

Expand Down Expand Up @@ -1374,18 +1374,18 @@ func TestResource_LessEqualResource(t *testing.T) {
ScalarResources: map[v1.ResourceName]float64{"scalar.test/scalar1": 1000, "hugepages-test": 2000},
},
resource2: &Resource{},
expected: "Insufficient cpu",
expected: "cpu",
},
}

for _, test := range testsForDefaultZero {
_, reason := test.resource1.LessEqualWithReason(test.resource2, Zero)
_, reason := test.resource1.LessEqualWithResource(test.resource2, Zero)
if !reflect.DeepEqual(test.expected, reason) {
t.Errorf("expected: %#v, got: %#v", test.expected, reason)
}
}
for caseID, test := range testsForDefaultInfinity {
_, reason := test.resource1.LessEqualWithReason(test.resource2, Infinity)
_, reason := test.resource1.LessEqualWithResource(test.resource2, Infinity)
if !reflect.DeepEqual(test.expected, reason) {
t.Errorf("caseID %d expected: %#v, got: %#v", caseID, test.expected, reason)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/scheduler/api/unschedule_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,8 @@ func NewFitError(task *TaskInfo, node *NodeInfo, message ...string) *FitError {
func (f *FitError) Error() string {
return fmt.Sprintf("task %s/%s on node %s fit failed: %s", f.taskNamespace, f.taskName, f.NodeName, strings.Join(f.Reasons, ", "))
}

// WrapInsufficientResourceReason wrap insufficient resource reason.
func WrapInsufficientResourceReason(resource string) string {
return "Insufficient " + resource
}
6 changes: 0 additions & 6 deletions pkg/scheduler/cache/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ func (sc *SchedulerCache) addTask(pi *schedulingapi.TaskInfo) error {
node := sc.Nodes[pi.NodeName]
if !isTerminated(pi.Status) {
if err := node.AddTask(pi); err != nil {
if _, outOfSync := err.(*schedulingapi.AllocateFailError); outOfSync {
node.State = schedulingapi.NodeState{
Phase: schedulingapi.NotReady,
Reason: "OutOfSync",
}
}
return err
}
} else {
Expand Down

0 comments on commit cbe8d0a

Please sign in to comment.