Skip to content

Commit

Permalink
add queue controller about state
Browse files Browse the repository at this point in the history
  • Loading branch information
sivanzcw committed Nov 7, 2019
1 parent 2d80438 commit 708fa42
Show file tree
Hide file tree
Showing 10 changed files with 658 additions and 133 deletions.
22 changes: 22 additions & 0 deletions pkg/apis/scheduling/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,28 @@ const (
NotEnoughPodsReason string = "NotEnoughTasks"
)

// QueueEvent represent the phase of queue
type QueueEvent string

const (
// QueueOutOfSyncEvent is triggered if PodGroup/Queue were updated
QueueOutOfSyncEvent QueueEvent = "OutOfSync"
// QueueCommandIssuedEvent is triggered if a command is raised by user
QueueCommandIssuedEvent QueueEvent = "CommandIssued"
)

// QueueAction is the action that queue controller will take according to the event.
type QueueAction string

const (
// SyncQueueAction is the action to sync queue status.
SyncQueueAction QueueAction = "SyncQueue"
// OpenQueueAction is the action to open queue
OpenQueueAction QueueAction = "OpenQueue"
// CloseQueueAction is the action to close queue
CloseQueueAction QueueAction = "CloseQueue"
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

Expand Down
27 changes: 23 additions & 4 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/client-go/util/workqueue"

batchv1alpha1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
busv1alpha1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
vcclientset "volcano.sh/volcano/pkg/client/clientset/versioned"
vcscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme"
informerfactory "volcano.sh/volcano/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -126,7 +127,7 @@ func NewJobController(
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller"})
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controllers"})

cc := &Controller{
kubeClient: kubeClient,
Expand Down Expand Up @@ -155,9 +156,27 @@ func NewJobController(
cc.jobSynced = cc.jobInformer.Informer().HasSynced

cc.cmdInformer = informerfactory.NewSharedInformerFactory(cc.vcClient, 0).Bus().V1alpha1().Commands()
cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
})
cc.cmdInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch obj.(type) {
case *busv1alpha1.Command:
cmd := obj.(*busv1alpha1.Command)
if cmd.TargetObject != nil &&
cmd.TargetObject.APIVersion == batchv1alpha1.SchemeGroupVersion.String() &&
cmd.TargetObject.Kind == "Job" {
return true
}

return false
default:
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
},
})
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

Expand Down
Loading

0 comments on commit 708fa42

Please sign in to comment.