diff --git a/installer/helm/chart/volcano/templates/scheduling_v1alpha2_queue.yaml b/installer/helm/chart/volcano/templates/scheduling_v1alpha2_queue.yaml index 850fd847448..2b3708cb749 100644 --- a/installer/helm/chart/volcano/templates/scheduling_v1alpha2_queue.yaml +++ b/installer/helm/chart/volcano/templates/scheduling_v1alpha2_queue.yaml @@ -25,9 +25,13 @@ spec: weight: format: int32 type: integer + state: + type: string type: object status: properties: + state: + type: string unknown: format: int32 type: integer @@ -37,6 +41,9 @@ spec: running: format: int32 type: integer + inqueue: + format: int32 + type: integer type: object type: object version: v1alpha2 diff --git a/installer/volcano-development.yaml b/installer/volcano-development.yaml index 39472049882..ae03d74b65a 100644 --- a/installer/volcano-development.yaml +++ b/installer/volcano-development.yaml @@ -782,9 +782,13 @@ spec: weight: format: int32 type: integer + state: + type: string type: object status: properties: + state: + type: string unknown: format: int32 type: integer @@ -794,6 +798,9 @@ spec: running: format: int32 type: integer + inqueue: + format: int32 + type: integer type: object type: object version: v1alpha2 diff --git a/pkg/apis/scheduling/types.go b/pkg/apis/scheduling/types.go index e075f5a15a2..cd014143d4b 100644 --- a/pkg/apis/scheduling/types.go +++ b/pkg/apis/scheduling/types.go @@ -202,12 +202,16 @@ type QueueStatus struct { Running int32 // The number of `Inqueue` PodGroup in this queue. Inqueue int32 + // State is status of queue + State string } // QueueSpec represents the template of Queue. type QueueSpec struct { Weight int32 Capability v1.ResourceList + // State controller the status of queue + State string } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go b/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go index 67690e9ea1a..872bf36df17 100644 --- a/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go +++ b/pkg/apis/scheduling/v1alpha1/zz_generated.conversion.go @@ -360,6 +360,7 @@ func Convert_v1alpha1_QueueSpec_To_scheduling_QueueSpec(in *QueueSpec, out *sche func autoConvert_scheduling_QueueSpec_To_v1alpha1_QueueSpec(in *scheduling.QueueSpec, out *QueueSpec, s conversion.Scope) error { out.Weight = in.Weight out.Capability = *(*v1.ResourceList)(unsafe.Pointer(&in.Capability)) + // WARNING: in.State requires manual conversion: does not exist in peer-type return nil } @@ -385,5 +386,6 @@ func autoConvert_scheduling_QueueStatus_To_v1alpha1_QueueStatus(in *scheduling.Q out.Pending = in.Pending out.Running = in.Running // WARNING: in.Inqueue requires manual conversion: does not exist in peer-type + // WARNING: in.State requires manual conversion: does not exist in peer-type return nil } diff --git a/pkg/apis/scheduling/v1alpha2/types.go b/pkg/apis/scheduling/v1alpha2/types.go index 2b39db6495d..f200fa5296d 100644 --- a/pkg/apis/scheduling/v1alpha2/types.go +++ b/pkg/apis/scheduling/v1alpha2/types.go @@ -24,6 +24,9 @@ import ( // PodGroupPhase is the phase of a pod group at the current time. type PodGroupPhase string +// QueueState is state type of queue +type QueueState string + // These are the valid phase of podGroups. const ( // PodPending means the pod group has been accepted by the system, but scheduler can not allocate @@ -44,6 +47,15 @@ const ( type PodGroupConditionType string +const ( + // QueueStateOpen indicate `Open` state of queue + QueueStateOpen QueueState = "Open" + // QueueStateClosed indicate `Closed` state of queue + QueueStateClosed QueueState = "Closed" + // QueueStateClosing indicate `Closing` state of queue + QueueStateClosing QueueState = "Closing" +) + const ( PodGroupUnschedulableType PodGroupConditionType = "Unschedulable" ) @@ -203,12 +215,16 @@ type QueueStatus struct { Running int32 `json:"running,omitempty" protobuf:"bytes,3,opt,name=running"` // The number of `Inqueue` PodGroup in this queue. Inqueue int32 `json:"inqueue,omitempty" protobuf:"bytes,4,opt,name=inqueue"` + // State is state of queue + State QueueState `json:"state,omitempty" protobuf:"bytes,5,opt,name=state"` } // QueueSpec represents the template of Queue. type QueueSpec struct { Weight int32 `json:"weight,omitempty" protobuf:"bytes,1,opt,name=weight"` Capability v1.ResourceList `json:"capability,omitempty" protobuf:"bytes,2,opt,name=capability"` + // State controller the status of queue + State QueueState `json:"state,omitempty" protobuf:"bytes,3,opt,name=state"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/scheduling/v1alpha2/zz_generated.conversion.go b/pkg/apis/scheduling/v1alpha2/zz_generated.conversion.go index b029c97db81..1e9581a1d33 100644 --- a/pkg/apis/scheduling/v1alpha2/zz_generated.conversion.go +++ b/pkg/apis/scheduling/v1alpha2/zz_generated.conversion.go @@ -324,6 +324,7 @@ func Convert_scheduling_QueueList_To_v1alpha2_QueueList(in *scheduling.QueueList func autoConvert_v1alpha2_QueueSpec_To_scheduling_QueueSpec(in *QueueSpec, out *scheduling.QueueSpec, s conversion.Scope) error { out.Weight = in.Weight out.Capability = *(*v1.ResourceList)(unsafe.Pointer(&in.Capability)) + out.State = string(in.State) return nil } @@ -335,6 +336,7 @@ func Convert_v1alpha2_QueueSpec_To_scheduling_QueueSpec(in *QueueSpec, out *sche func autoConvert_scheduling_QueueSpec_To_v1alpha2_QueueSpec(in *scheduling.QueueSpec, out *QueueSpec, s conversion.Scope) error { out.Weight = in.Weight out.Capability = *(*v1.ResourceList)(unsafe.Pointer(&in.Capability)) + out.State = QueueState(in.State) return nil } @@ -348,6 +350,7 @@ func autoConvert_v1alpha2_QueueStatus_To_scheduling_QueueStatus(in *QueueStatus, out.Pending = in.Pending out.Running = in.Running out.Inqueue = in.Inqueue + out.State = string(in.State) return nil } @@ -361,6 +364,7 @@ func autoConvert_scheduling_QueueStatus_To_v1alpha2_QueueStatus(in *scheduling.Q out.Pending = in.Pending out.Running = in.Running out.Inqueue = in.Inqueue + out.State = QueueState(in.State) return nil } diff --git a/pkg/controllers/queue/queue_controller.go b/pkg/controllers/queue/queue_controller.go index fbdbe3a01bd..1d76ef995e4 100644 --- a/pkg/controllers/queue/queue_controller.go +++ b/pkg/controllers/queue/queue_controller.go @@ -17,7 +17,6 @@ limitations under the License. package queue import ( - "fmt" "reflect" "sync" @@ -86,6 +85,7 @@ func NewQueueController( queueInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: c.addQueue, + UpdateFunc: c.updateQueue, DeleteFunc: c.deleteQueue, }) @@ -139,31 +139,26 @@ func (c *Controller) processNextWorkItem() bool { return true } -func (c *Controller) getPodGroups(key string) ([]string, error) { +func (c *Controller) getPodGroups(key string) []string { c.pgMutex.RLock() defer c.pgMutex.RUnlock() if c.podGroups[key] == nil { - return nil, fmt.Errorf("queue %s has not been seen or deleted", key) + return nil } podGroups := make([]string, 0, len(c.podGroups[key])) for pgKey := range c.podGroups[key] { podGroups = append(podGroups, pgKey) } - return podGroups, nil + return podGroups } func (c *Controller) syncQueue(key string) error { glog.V(4).Infof("Begin sync queue %s", key) - podGroups, err := c.getPodGroups(key) - if err != nil { - return err - } - queueStatus := schedulingv1alpha2.QueueStatus{} - + podGroups := c.getPodGroups(key) for _, pgKey := range podGroups { // Ignore error here, tt can not occur. ns, name, _ := cache.SplitMetaNamespaceKey(pgKey) @@ -196,6 +191,23 @@ func (c *Controller) syncQueue(key string) error { return err } + // If the `state` value is empty, the status of queue will be set as `Open` + // If the `state` value is `Open`, then the status of queue will also be `Open` + // If the `state` value is `Closed`, then we need to further consider whether there + // is a podgroup under the queue. if there is a podgroup under the queue, the status + // of the queue will be set as `Closing`, while if there is no podgroup under the queue, + // the status of queue will be set as `Stopped`. + queueStatus.State = queue.Spec.State + if len(queueStatus.State) == 0 { + queueStatus.State = schedulingv1alpha2.QueueStateOpen + } + + if queueStatus.State == schedulingv1alpha2.QueueStateClosed { + if len(podGroups) != 0 { + queueStatus.State = schedulingv1alpha2.QueueStateClosing + } + } + // ignore update when status does not change if reflect.DeepEqual(queueStatus, queue.Status) { return nil @@ -217,6 +229,28 @@ func (c *Controller) addQueue(obj interface{}) { c.queue.Add(queue.Name) } +func (c *Controller) updateQueue(old, new interface{}) { + oldQueue, ok := old.(*schedulingv1alpha2.Queue) + if !ok { + glog.Errorf("Can not covert old object %v to queues.scheduling.sigs.dev", old) + return + } + + newQueue, ok := new.(*schedulingv1alpha2.Queue) + if !ok { + glog.Errorf("Can not covert new object %v to queues.scheduling.sigs.dev", old) + return + } + + if oldQueue.ResourceVersion == newQueue.ResourceVersion { + return + } + + c.addQueue(newQueue) + + return +} + func (c *Controller) deleteQueue(obj interface{}) { queue, ok := obj.(*schedulingv1alpha2.Queue) if !ok { diff --git a/pkg/controllers/queue/queue_controller_test.go b/pkg/controllers/queue/queue_controller_test.go index 947da26d948..1bb5895ca7e 100644 --- a/pkg/controllers/queue/queue_controller_test.go +++ b/pkg/controllers/queue/queue_controller_test.go @@ -66,6 +66,54 @@ func TestAddQueue(t *testing.T) { } } +func TestUpdateQueue(t *testing.T) { + testCases := []struct { + Name string + oldQueue *schedulingv1alpha2.Queue + newQueue *schedulingv1alpha2.Queue + ExpectValue string + }{ + { + Name: "UpdateQueue", + oldQueue: &schedulingv1alpha2.Queue{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "v1", + Name: "update-queue", + }, + Spec: schedulingv1alpha2.QueueSpec{ + Weight: 1, + State: schedulingv1alpha2.QueueStateOpen, + }, + }, + newQueue: &schedulingv1alpha2.Queue{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "v2", + Name: "update-queue", + }, + Spec: schedulingv1alpha2.QueueSpec{ + Weight: 1, + State: schedulingv1alpha2.QueueStateClosed, + }, + }, + ExpectValue: "update-queue", + }, + } + + for _, testCase := range testCases { + c := newFakeController() + + c.updateQueue(testCase.oldQueue, testCase.newQueue) + item, shutdown := c.queue.Get() + if true == shutdown { + t.Errorf("Case %s failed for queue shutdown", testCase.Name) + } + + if item != testCase.ExpectValue { + t.Errorf("Case %s failed, expect %v, got %v", testCase.Name, testCase.ExpectValue, item) + } + } +} + func TestDeleteQueue(t *testing.T) { testCases := []struct { Name string