Skip to content

Commit

Permalink
Merge pull request volcano-sh#92 from hzxuzhonghu/job-controller
Browse files Browse the repository at this point in the history
  • Loading branch information
Klaus Ma authored Apr 18, 2019
2 parents 60e8741 + 22879b0 commit 1be22b5
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 168 deletions.
10 changes: 0 additions & 10 deletions pkg/apis/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,6 @@ func CreateConfigMapIfNotExist(job *vkv1.Job, kubeClients *kubernetes.Clientset,
}

func DeleteConfigmap(job *vkv1.Job, kubeClients *kubernetes.Clientset, cmName string) error {
if _, err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Get(cmName, metav1.GetOptions{}); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get Configmap for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
} else {
return nil
}
}

if err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Delete(cmName, nil); err != nil {
if !apierrors.IsNotFound(err) {
glog.Errorf("Failed to delete Configmap of Job %v/%v: %v",
Expand Down
52 changes: 6 additions & 46 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@ limitations under the License.
package job

import (
"fmt"

"github.com/golang/glog"

"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
Expand All @@ -38,8 +35,6 @@ import (
kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1"
kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1"

v1corev1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/apis/helpers"
vkver "volcano.sh/volcano/pkg/client/clientset/versioned"
vkscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme"
vkinfoext "volcano.sh/volcano/pkg/client/informers/externalversions"
Expand Down Expand Up @@ -127,53 +122,18 @@ func NewJobController(config *rest.Config) *Controller {
cc.jobSynced = cc.jobInformer.Informer().HasSynced

cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Bus().V1alpha1().Commands()
cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1corev1.Command:
return helpers.ControlledBy(t, helpers.JobKind)
case cache.DeletedFinalStateUnknown:
if cmd, ok := t.Obj.(*v1corev1.Command); ok {
return helpers.ControlledBy(cmd, helpers.JobKind)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Command", obj))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
},
cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addCommand,
})
cc.cmdLister = cc.cmdInformer.Lister()
cc.cmdSynced = cc.cmdInformer.Informer().HasSynced

cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0)
podInformer := cc.sharedInformers.Core().V1().Pods()

podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: func(obj interface{}) bool {
switch t := obj.(type) {
case *v1.Pod:
return helpers.ControlledBy(t, helpers.JobKind)
case cache.DeletedFinalStateUnknown:
if pod, ok := t.Obj.(*v1.Pod); ok {
return helpers.ControlledBy(pod, helpers.JobKind)
}
runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj))
return false
default:
runtime.HandleError(fmt.Errorf("unable to handle object %T", obj))
return false
}
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
DeleteFunc: cc.deletePod,
},
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cc.addPod,
UpdateFunc: cc.updatePod,
DeleteFunc: cc.deletePod,
})

cc.podLister = podInformer.Lister()
Expand Down
156 changes: 59 additions & 97 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,38 +66,20 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
continue
}

switch pod.Status.Phase {
case v1.PodRunning:
err := cc.deleteJobPod(job.Name, pod)
if err != nil {
running++
errs = append(errs, err)
continue
}
if err := cc.deleteJobPod(job.Name, pod); err == nil {
terminating++
case v1.PodPending:
err := cc.deleteJobPod(job.Name, pod)
if err != nil {
} else {
errs = append(errs, err)
switch pod.Status.Phase {
case v1.PodRunning:
running++
case v1.PodPending:
pending++
errs = append(errs, err)
continue
}
terminating++
case v1.PodSucceeded:
err := cc.deleteJobPod(job.Name, pod)
if err != nil {
case v1.PodSucceeded:
succeeded++
errs = append(errs, err)
continue
}
case v1.PodFailed:
err := cc.deleteJobPod(job.Name, pod)
if err != nil {
case v1.PodFailed:
failed++
errs = append(errs, err)
continue
}
terminating++
}
}
}
Expand Down Expand Up @@ -220,32 +202,23 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
}
podToCreate = append(podToCreate, newPod)
} else {
delete(pods, podName)
if pod.DeletionTimestamp != nil {
glog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name)
terminating++
delete(pods, podName)
continue
}

switch pod.Status.Phase {
case v1.PodPending:
if pod.DeletionTimestamp != nil {
terminating++
} else {
pending++
}
pending++
case v1.PodRunning:
if pod.DeletionTimestamp != nil {
terminating++
} else {
running++
}
running++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
delete(pods, podName)
}
}

Expand All @@ -260,7 +233,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
go func(pod *v1.Pod) {
defer waitCreationGroup.Done()
_, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod)
if err != nil {
if err != nil && !apierrors.IsAlreadyExists(err) {
// Failed to create Pod, waitCreationGroup a moment and then create it again
// This is to ensure all podsMap under the same Job created
// So gang-scheduling could schedule the Job successfully
Expand All @@ -280,6 +253,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate))
}

// TODO: Can hardly imagine when this is necessary.
// Delete unnecessary pods.
waitDeletionGroup := sync.WaitGroup{}
waitDeletionGroup.Add(len(podToDelete))
Expand Down Expand Up @@ -337,16 +311,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn
return nil
}

func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 {
if current == 0 {
current += 1
}
if bumpVersion {
current += 1
}
return current
}

func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error {
// If Service does not exist, create one for Job.
if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil {
Expand Down Expand Up @@ -397,68 +361,66 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error {
// If input/output PVC does not exist, create them for Job.
inputPVC := job.Annotations[admissioncontroller.PVCInputName]
outputPVC := job.Annotations[admissioncontroller.PVCOutputName]
if job.Spec.Input != nil {
if job.Spec.Input.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
if job.Spec.Input != nil && job.Spec.Input.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: inputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: inputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Spec: *job.Spec.Input.VolumeClaim,
}
},
Spec: *job.Spec.Input.VolumeClaim,
}

glog.V(3).Infof("Try to create input PVC: %v", pvc)
glog.V(3).Infof("Try to create input PVC: %v", pvc)

if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
}
}
if job.Spec.Output != nil {
if job.Spec.Output.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: outputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
if job.Spec.Output != nil && job.Spec.Output.VolumeClaim != nil {
if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil {
if !apierrors.IsNotFound(err) {
glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}

pvc := &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Namespace: job.Namespace,
Name: outputPVC,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(job, helpers.JobKind),
},
Spec: *job.Spec.Output.VolumeClaim,
}
},
Spec: *job.Spec.Output.VolumeClaim,
}

glog.V(3).Infof("Try to create output PVC: %v", pvc)
glog.V(3).Infof("Try to create output PVC: %v", pvc)

if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil {
if !apierrors.IsAlreadyExists(err) {
glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v",
job.Namespace, job.Name, err)
return err
}
}
}
}

return nil
}

Expand Down
Loading

0 comments on commit 1be22b5

Please sign in to comment.