Skip to content

Commit

Permalink
add state parameter for queue
Browse files Browse the repository at this point in the history
  • Loading branch information
sivanzcw committed Nov 4, 2019
1 parent b66888f commit f1d8f9d
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ spec:
weight:
format: int32
type: integer
state:
type: string
type: object
status:
properties:
state:
type: string
unknown:
format: int32
type: integer
Expand All @@ -37,6 +41,9 @@ spec:
running:
format: int32
type: integer
inqueue:
format: int32
type: integer
type: object
type: object
version: v1alpha2
Expand Down
7 changes: 7 additions & 0 deletions installer/volcano-development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -782,9 +782,13 @@ spec:
weight:
format: int32
type: integer
state:
type: string
type: object
status:
properties:
state:
type: string
unknown:
format: int32
type: integer
Expand All @@ -794,6 +798,9 @@ spec:
running:
format: int32
type: integer
inqueue:
format: int32
type: integer
type: object
type: object
version: v1alpha2
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/scheduling/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,16 @@ type QueueStatus struct {
Running int32
// The number of `Inqueue` PodGroup in this queue.
Inqueue int32
// State is status of queue
State string
}

// QueueSpec represents the template of Queue.
type QueueSpec struct {
Weight int32
Capability v1.ResourceList
// State controller the status of queue
State string
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions pkg/apis/scheduling/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import (
// PodGroupPhase is the phase of a pod group at the current time.
type PodGroupPhase string

// QueueState is state type of queue
type QueueState string

// These are the valid phase of podGroups.
const (
// PodPending means the pod group has been accepted by the system, but scheduler can not allocate
Expand All @@ -44,6 +47,15 @@ const (

type PodGroupConditionType string

const (
// QueueStateOpen indicate `Open` state of queue
QueueStateOpen QueueState = "Open"
// QueueStateClosed indicate `Closed` state of queue
QueueStateClosed QueueState = "Closed"
// QueueStateClosing indicate `Closing` state of queue
QueueStateClosing QueueState = "Closing"
)

const (
PodGroupUnschedulableType PodGroupConditionType = "Unschedulable"
)
Expand Down Expand Up @@ -203,12 +215,16 @@ type QueueStatus struct {
Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"`
// The number of `Inqueue` PodGroup in this queue.
Inqueue int32 `json:"inqueue,omitempty" protobuf:"bytes,4,opt,name=inqueue"`
// State is state of queue
State QueueState `json:"state,omitempty" protobuf:"bytes,5,opt,name=state"`
}

// QueueSpec represents the template of Queue.
type QueueSpec struct {
Weight int32 `json:"weight,omitempty" protobuf:"bytes,1,opt,name=weight"`
Capability v1.ResourceList `json:"capability,omitempty" protobuf:"bytes,2,opt,name=capability"`
// State controller the status of queue
State QueueState `json:"state,omitempty" protobuf:"bytes,3,opt,name=state"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/scheduling/v1alpha2/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 44 additions & 10 deletions pkg/controllers/queue/queue_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package queue

import (
"fmt"
"reflect"
"sync"

Expand Down Expand Up @@ -86,6 +85,7 @@ func NewQueueController(

queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addQueue,
UpdateFunc: c.updateQueue,
DeleteFunc: c.deleteQueue,
})

Expand Down Expand Up @@ -139,31 +139,26 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

func (c *Controller) getPodGroups(key string) ([]string, error) {
func (c *Controller) getPodGroups(key string) []string {
c.pgMutex.RLock()
defer c.pgMutex.RUnlock()

if c.podGroups[key] == nil {
return nil, fmt.Errorf("queue %s has not been seen or deleted", key)
return nil
}
podGroups := make([]string, 0, len(c.podGroups[key]))
for pgKey := range c.podGroups[key] {
podGroups = append(podGroups, pgKey)
}

return podGroups, nil
return podGroups
}

func (c *Controller) syncQueue(key string) error {
glog.V(4).Infof("Begin sync queue %s", key)

podGroups, err := c.getPodGroups(key)
if err != nil {
return err
}

queueStatus := schedulingv1alpha2.QueueStatus{}

podGroups := c.getPodGroups(key)
for _, pgKey := range podGroups {
// Ignore error here, tt can not occur.
ns, name, _ := cache.SplitMetaNamespaceKey(pgKey)
Expand Down Expand Up @@ -196,6 +191,23 @@ func (c *Controller) syncQueue(key string) error {
return err
}

// If the `state` value is empty, the status of queue will be set as `Open`
// If the `state` value is `Open`, then the status of queue will also be `Open`
// If the `state` value is `Closed`, then we need to further consider whether there
// is a podgroup under the queue. if there is a podgroup under the queue, the status
// of the queue will be set as `Closing`, while if there is no podgroup under the queue,
// the status of queue will be set as `Stopped`.
queueStatus.State = queue.Spec.State
if len(queueStatus.State) == 0 {
queueStatus.State = schedulingv1alpha2.QueueStateOpen
}

if queueStatus.State == schedulingv1alpha2.QueueStateClosed {
if len(podGroups) != 0 {
queueStatus.State = schedulingv1alpha2.QueueStateClosing
}
}

// ignore update when status does not change
if reflect.DeepEqual(queueStatus, queue.Status) {
return nil
Expand All @@ -217,6 +229,28 @@ func (c *Controller) addQueue(obj interface{}) {
c.queue.Add(queue.Name)
}

func (c *Controller) updateQueue(old, new interface{}) {
oldQueue, ok := old.(*schedulingv1alpha2.Queue)
if !ok {
glog.Errorf("Can not covert old object %v to queues.scheduling.sigs.dev", old)
return
}

newQueue, ok := new.(*schedulingv1alpha2.Queue)
if !ok {
glog.Errorf("Can not covert new object %v to queues.scheduling.sigs.dev", old)
return
}

if oldQueue.ResourceVersion == newQueue.ResourceVersion {
return
}

c.addQueue(newQueue)

return
}

func (c *Controller) deleteQueue(obj interface{}) {
queue, ok := obj.(*schedulingv1alpha2.Queue)
if !ok {
Expand Down
48 changes: 48 additions & 0 deletions pkg/controllers/queue/queue_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,54 @@ func TestAddQueue(t *testing.T) {
}
}

func TestUpdateQueue(t *testing.T) {
testCases := []struct {
Name string
oldQueue *schedulingv1alpha2.Queue
newQueue *schedulingv1alpha2.Queue
ExpectValue string
}{
{
Name: "UpdateQueue",
oldQueue: &schedulingv1alpha2.Queue{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "v1",
Name: "update-queue",
},
Spec: schedulingv1alpha2.QueueSpec{
Weight: 1,
State: schedulingv1alpha2.QueueStateOpen,
},
},
newQueue: &schedulingv1alpha2.Queue{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "v2",
Name: "update-queue",
},
Spec: schedulingv1alpha2.QueueSpec{
Weight: 1,
State: schedulingv1alpha2.QueueStateClosed,
},
},
ExpectValue: "update-queue",
},
}

for _, testCase := range testCases {
c := newFakeController()

c.updateQueue(testCase.oldQueue, testCase.newQueue)
item, shutdown := c.queue.Get()
if true == shutdown {
t.Errorf("Case %s failed for queue shutdown", testCase.Name)
}

if item != testCase.ExpectValue {
t.Errorf("Case %s failed, expect %v, got %v", testCase.Name, testCase.ExpectValue, item)
}
}
}

func TestDeleteQueue(t *testing.T) {
testCases := []struct {
Name string
Expand Down

0 comments on commit f1d8f9d

Please sign in to comment.