Skip to content

Commit

Permalink
Merge pull request volcano-retired#26 from k82cn/kb_861
Browse files Browse the repository at this point in the history
Ignore nodes if out of syc.
  • Loading branch information
volcano-sh-bot authored Jun 11, 2019
2 parents 5dd517e + 60a5413 commit e191179
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 16 deletions.
88 changes: 72 additions & 16 deletions pkg/scheduler/api/node_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package api
import (
"fmt"

"github.com/golang/glog"

v1 "k8s.io/api/core/v1"
)

Expand All @@ -27,6 +29,9 @@ type NodeInfo struct {
Name string
Node *v1.Node

// The state of node
State NodeState

// The releasing resource on that node
Releasing *Resource
// The idle resource on that node
Expand All @@ -44,10 +49,18 @@ type NodeInfo struct {
Other interface{}
}

// NodeState defines the current state of node.
type NodeState struct {
Phase NodePhase
Reason string
}

// NewNodeInfo is used to create new nodeInfo object
func NewNodeInfo(node *v1.Node) *NodeInfo {
var ni *NodeInfo

if node == nil {
return &NodeInfo{
ni = &NodeInfo{
Releasing: EmptyResource(),
Idle: EmptyResource(),
Used: EmptyResource(),
Expand All @@ -57,21 +70,25 @@ func NewNodeInfo(node *v1.Node) *NodeInfo {

Tasks: make(map[TaskID]*TaskInfo),
}
}

return &NodeInfo{
Name: node.Name,
Node: node,
} else {
ni = &NodeInfo{
Name: node.Name,
Node: node,

Releasing: EmptyResource(),
Idle: NewResource(node.Status.Allocatable),
Used: EmptyResource(),
Releasing: EmptyResource(),
Idle: NewResource(node.Status.Allocatable),
Used: EmptyResource(),

Allocatable: NewResource(node.Status.Allocatable),
Capability: NewResource(node.Status.Capacity),
Allocatable: NewResource(node.Status.Allocatable),
Capability: NewResource(node.Status.Capacity),

Tasks: make(map[TaskID]*TaskInfo),
Tasks: make(map[TaskID]*TaskInfo),
}
}

ni.setNodeState(node)

return ni
}

// Clone used to clone nodeInfo Object
Expand All @@ -85,8 +102,47 @@ func (ni *NodeInfo) Clone() *NodeInfo {
return res
}

// Ready returns whether node is ready for scheduling
func (ni *NodeInfo) Ready() bool {
return ni.State.Phase == Ready
}

func (ni *NodeInfo) setNodeState(node *v1.Node) {
// If node is nil, the node is un-initialized in cache
if node == nil {
ni.State = NodeState{
Phase: NotReady,
Reason: "UnInitialized",
}
return
}

// set NodeState according to resources
if !ni.Used.LessEqual(NewResource(node.Status.Allocatable)) {
ni.State = NodeState{
Phase: NotReady,
Reason: "OutOfSync",
}
return
}

// Node is ready (ignore node conditions because of taint/toleration)
ni.State = NodeState{
Phase: Ready,
Reason: "",
}
}

// SetNode sets kubernetes node object to nodeInfo object
func (ni *NodeInfo) SetNode(node *v1.Node) {
ni.setNodeState(node)

if !ni.Ready() {
glog.Warningf("Failed to set node info, phase: %s, reason: %s",
ni.State.Phase, ni.State.Reason)
return
}

ni.Name = node.Name
ni.Node = node

Expand Down Expand Up @@ -176,16 +232,16 @@ func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error {

// String returns nodeInfo details in string format
func (ni NodeInfo) String() string {
res := ""
tasks := ""

i := 0
for _, task := range ni.Tasks {
res = res + fmt.Sprintf("\n\t %d: %v", i, task)
tasks = tasks + fmt.Sprintf("\n\t %d: %v", i, task)
i++
}

return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, taints <%v>%s",
ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.Node.Spec.Taints, res)
return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>, state <phase %s, reaseon %s>, taints <%v>%s",
ni.Name, ni.Idle, ni.Used, ni.Releasing, ni.State.Phase, ni.State.Reason, ni.Node.Spec.Taints, tasks)

}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/api/node_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func TestNodeInfo_AddPod(t *testing.T) {
Releasing: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p2": NewTaskInfo(case01Pod2),
Expand Down Expand Up @@ -106,6 +107,7 @@ func TestNodeInfo_RemovePod(t *testing.T) {
Releasing: EmptyResource(),
Allocatable: buildResource("8000m", "10G"),
Capability: buildResource("8000m", "10G"),
State: NodeState{Phase: Ready},
Tasks: map[TaskID]*TaskInfo{
"c1/p1": NewTaskInfo(case01Pod1),
"c1/p3": NewTaskInfo(case01Pod3),
Expand Down
21 changes: 21 additions & 0 deletions pkg/scheduler/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,27 @@ func (ts TaskStatus) String() string {
}
}

// NodePhase defines the phase of node
type NodePhase int

const (
// Ready means the node is ready for scheduling
Ready NodePhase = 1 << iota
// NotReady means the node is not ready for scheduling
NotReady
)

func (np NodePhase) String() string {
switch np {
case Ready:
return "Ready"
case NotReady:
return "NotReady"
}

return "Unknown"
}

// validateStatusUpdate validates whether the status transfer is valid.
func validateStatusUpdate(oldStatus, newStatus TaskStatus) error {
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,10 @@ func (sc *SchedulerCache) Snapshot() *kbapi.ClusterInfo {
}

for _, value := range sc.Nodes {
if !value.Ready() {
continue
}

snapshot.Nodes[value.Name] = value.Clone()
}

Expand Down

0 comments on commit e191179

Please sign in to comment.