From dc2d48f243b6cd8daabc8dbe5fd7935885d4e2a5 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Thu, 18 Apr 2019 14:09:42 +0800 Subject: [PATCH 1/4] remove duplicate --- pkg/controllers/job/job_controller_actions.go | 36 +++++-------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 4b7729c6ff..c7dddd1967 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -66,38 +66,20 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn continue } - switch pod.Status.Phase { - case v1.PodRunning: - err := cc.deleteJobPod(job.Name, pod) - if err != nil { - running++ - errs = append(errs, err) - continue - } + if err := cc.deleteJobPod(job.Name, pod); err != nil { terminating++ - case v1.PodPending: - err := cc.deleteJobPod(job.Name, pod) - if err != nil { + } else { + errs = append(errs, err) + switch pod.Status.Phase { + case v1.PodRunning: + running++ + case v1.PodPending: pending++ - errs = append(errs, err) - continue - } - terminating++ - case v1.PodSucceeded: - err := cc.deleteJobPod(job.Name, pod) - if err != nil { + case v1.PodSucceeded: succeeded++ - errs = append(errs, err) - continue - } - case v1.PodFailed: - err := cc.deleteJobPod(job.Name, pod) - if err != nil { + case v1.PodFailed: failed++ - errs = append(errs, err) - continue } - terminating++ } } } From 5340363231d6a16464774e8e89b8a0e83242e843 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Thu, 18 Apr 2019 15:15:55 +0800 Subject: [PATCH 2/4] clean code --- pkg/apis/helpers/helpers.go | 10 -- pkg/controllers/job/job_controller_actions.go | 120 ++++++++---------- 2 files changed, 50 insertions(+), 80 deletions(-) diff --git a/pkg/apis/helpers/helpers.go b/pkg/apis/helpers/helpers.go index cf4ae2a91b..9b30b50c27 100644 --- a/pkg/apis/helpers/helpers.go +++ b/pkg/apis/helpers/helpers.go @@ -94,16 +94,6 @@ func CreateConfigMapIfNotExist(job *vkv1.Job, kubeClients *kubernetes.Clientset, } func DeleteConfigmap(job *vkv1.Job, kubeClients *kubernetes.Clientset, cmName string) error { - if _, err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Get(cmName, metav1.GetOptions{}); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get Configmap for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } else { - return nil - } - } - if err := kubeClients.CoreV1().ConfigMaps(job.Namespace).Delete(cmName, nil); err != nil { if !apierrors.IsNotFound(err) { glog.Errorf("Failed to delete Configmap of Job %v/%v: %v", diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index c7dddd1967..a4e810c73d 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -202,32 +202,23 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn } podToCreate = append(podToCreate, newPod) } else { + delete(pods, podName) if pod.DeletionTimestamp != nil { glog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) terminating++ - delete(pods, podName) continue } switch pod.Status.Phase { case v1.PodPending: - if pod.DeletionTimestamp != nil { - terminating++ - } else { - pending++ - } + pending++ case v1.PodRunning: - if pod.DeletionTimestamp != nil { - terminating++ - } else { - running++ - } + running++ case v1.PodSucceeded: succeeded++ case v1.PodFailed: failed++ } - delete(pods, podName) } } @@ -242,7 +233,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn go func(pod *v1.Pod) { defer waitCreationGroup.Done() _, err := cc.kubeClients.CoreV1().Pods(pod.Namespace).Create(pod) - if err != nil { + if err != nil && !apierrors.IsAlreadyExists(err) { // Failed to create Pod, waitCreationGroup a moment and then create it again // This is to ensure all podsMap under the same Job created // So gang-scheduling could schedule the Job successfully @@ -262,6 +253,7 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn return fmt.Errorf("failed to create %d pods of %d", len(creationErrs), len(podToCreate)) } + // TODO: Can hardly imagine when this is necessary. // Delete unnecessary pods. waitDeletionGroup := sync.WaitGroup{} waitDeletionGroup.Add(len(podToDelete)) @@ -319,16 +311,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn return nil } -func (cc *Controller) calculateVersion(current int32, bumpVersion bool) int32 { - if current == 0 { - current += 1 - } - if bumpVersion { - current += 1 - } - return current -} - func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error { // If Service does not exist, create one for Job. if _, err := cc.svcLister.Services(job.Namespace).Get(job.Name); err != nil { @@ -379,68 +361,66 @@ func (cc *Controller) createJobIOIfNotExist(job *vkv1.Job) error { // If input/output PVC does not exist, create them for Job. inputPVC := job.Annotations[admissioncontroller.PVCInputName] outputPVC := job.Annotations[admissioncontroller.PVCOutputName] - if job.Spec.Input != nil { - if job.Spec.Input.VolumeClaim != nil { - if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } + if job.Spec.Input != nil && job.Spec.Input.VolumeClaim != nil { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(inputPVC); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: inputPVC, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: inputPVC, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), }, - Spec: *job.Spec.Input.VolumeClaim, - } + }, + Spec: *job.Spec.Input.VolumeClaim, + } - glog.V(3).Infof("Try to create input PVC: %v", pvc) + glog.V(3).Infof("Try to create input PVC: %v", pvc) - if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { - glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } + if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { + glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err } } } - if job.Spec.Output != nil { - if job.Spec.Output.VolumeClaim != nil { - if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { - if !apierrors.IsNotFound(err) { - glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } - pvc := &v1.PersistentVolumeClaim{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: job.Namespace, - Name: outputPVC, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(job, helpers.JobKind), - }, + if job.Spec.Output != nil && job.Spec.Output.VolumeClaim != nil { + if _, err := cc.pvcLister.PersistentVolumeClaims(job.Namespace).Get(outputPVC); err != nil { + if !apierrors.IsNotFound(err) { + glog.V(3).Infof("Failed to get output PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err + } + + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: job.Namespace, + Name: outputPVC, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(job, helpers.JobKind), }, - Spec: *job.Spec.Output.VolumeClaim, - } + }, + Spec: *job.Spec.Output.VolumeClaim, + } - glog.V(3).Infof("Try to create output PVC: %v", pvc) + glog.V(3).Infof("Try to create output PVC: %v", pvc) - if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { - if !apierrors.IsAlreadyExists(err) { - glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", - job.Namespace, job.Name, err) - return err - } + if _, err := cc.kubeClients.CoreV1().PersistentVolumeClaims(job.Namespace).Create(pvc); err != nil { + if !apierrors.IsAlreadyExists(err) { + glog.V(3).Infof("Failed to create input PVC for Job <%s/%s>: %v", + job.Namespace, job.Name, err) + return err } } } } + return nil } From 3e8b64abc7d915cf7cdd0cb596ebc021ed33d541 Mon Sep 17 00:00:00 2001 From: xuzhonghu Date: Thu, 18 Apr 2019 15:33:26 +0800 Subject: [PATCH 3/4] simplify resource event handler --- pkg/controllers/job/job_controller.go | 52 +++---------------- pkg/controllers/job/job_controller_handler.go | 41 +++++++++------ 2 files changed, 32 insertions(+), 61 deletions(-) diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index c3b7715a5d..bbe28ea905 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -17,12 +17,9 @@ limitations under the License. package job import ( - "fmt" - "github.com/golang/glog" "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -38,8 +35,6 @@ import ( kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1" kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1" - v1corev1 "volcano.sh/volcano/pkg/apis/bus/v1alpha1" - "volcano.sh/volcano/pkg/apis/helpers" vkver "volcano.sh/volcano/pkg/client/clientset/versioned" vkscheme "volcano.sh/volcano/pkg/client/clientset/versioned/scheme" vkinfoext "volcano.sh/volcano/pkg/client/informers/externalversions" @@ -127,53 +122,18 @@ func NewJobController(config *rest.Config) *Controller { cc.jobSynced = cc.jobInformer.Informer().HasSynced cc.cmdInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Bus().V1alpha1().Commands() - cc.cmdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1corev1.Command: - return helpers.ControlledBy(t, helpers.JobKind) - case cache.DeletedFinalStateUnknown: - if cmd, ok := t.Obj.(*v1corev1.Command); ok { - return helpers.ControlledBy(cmd, helpers.JobKind) - } - runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Command", obj)) - return false - default: - runtime.HandleError(fmt.Errorf("unable to handle object %T", obj)) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addCommand, - }, + cc.cmdInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addCommand, }) cc.cmdLister = cc.cmdInformer.Lister() cc.cmdSynced = cc.cmdInformer.Informer().HasSynced cc.sharedInformers = informers.NewSharedInformerFactory(cc.kubeClients, 0) podInformer := cc.sharedInformers.Core().V1().Pods() - - podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: func(obj interface{}) bool { - switch t := obj.(type) { - case *v1.Pod: - return helpers.ControlledBy(t, helpers.JobKind) - case cache.DeletedFinalStateUnknown: - if pod, ok := t.Obj.(*v1.Pod); ok { - return helpers.ControlledBy(pod, helpers.JobKind) - } - runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod", obj)) - return false - default: - runtime.HandleError(fmt.Errorf("unable to handle object %T", obj)) - return false - } - }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: cc.addPod, - UpdateFunc: cc.updatePod, - DeleteFunc: cc.deletePod, - }, + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: cc.addPod, + UpdateFunc: cc.updatePod, + DeleteFunc: cc.deletePod, }) cc.podLister = podInformer.Lister() diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 4b9197d506..a32af324c1 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/tools/cache" kbtype "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" @@ -103,8 +104,17 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { func (cc *Controller) deleteJob(obj interface{}) { job, ok := obj.(*vkbatchv1.Job) if !ok { - glog.Errorf("obj is not Job") - return + // If we reached here it means the Job was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + job, ok = tombstone.Obj.(*vkbatchv1.Job) + if !ok { + glog.Errorf("Tombstone contained object that is not a volcano Job: %#v", obj) + return + } } if err := cc.cache.Delete(job); err != nil { @@ -228,20 +238,19 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { } func (cc *Controller) deletePod(obj interface{}) { - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = t - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*v1.Pod) + pod, ok := obj.(*v1.Pod) + if !ok { + // If we reached here it means the pod was deleted but its final state is unrecorded. + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - glog.Errorf("Cannot convert to *v1.Pod: %v", t.Obj) + glog.Errorf("Couldn't get object from tombstone %#v", obj) + return + } + pod, ok = tombstone.Obj.(*v1.Pod) + if !ok { + glog.Errorf("Tombstone contained object that is not a Pod: %#v", obj) return } - default: - glog.Errorf("Cannot convert to *v1.Pod: %v", t) - return } taskName, found := pod.Annotations[vkbatchv1.TaskSpecKey] @@ -314,8 +323,10 @@ func (cc *Controller) processNextCommand() bool { defer cc.commandQueue.Done(cmd) if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil { - glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name) - cc.commandQueue.AddRateLimited(cmd) + if !apierrors.IsNotFound(err) { + glog.Errorf("Failed to delete Command <%s/%s>.", cmd.Namespace, cmd.Name) + cc.commandQueue.AddRateLimited(cmd) + } return true } cc.recordJobEvent(cmd.Namespace, cmd.TargetObject.Name, From 159994c1caf9a3376a6cd729173f5a2acc69e840 Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Thu, 18 Apr 2019 17:23:00 +0800 Subject: [PATCH 4/4] fix comment --- pkg/controllers/job/job_controller_actions.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index a4e810c73d..db4e0ba2cb 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -66,7 +66,7 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn continue } - if err := cc.deleteJobPod(job.Name, pod); err != nil { + if err := cc.deleteJobPod(job.Name, pod); err == nil { terminating++ } else { errs = append(errs, err)