diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index cf952290181..e7fc9f1594f 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -142,7 +142,11 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseM return nil } -func (cc *Controller) createJob(job *batch.Job) (*batch.Job, error) { +func (cc *Controller) initiateJob(job *batch.Job) (*batch.Job, error) { + klog.V(3).Infof("Starting to initiate Job <%s/%s>", job.Namespace, job.Name) + defer klog.V(3).Infof("Finished Job <%s/%s> initiate", job.Namespace, job.Name) + + klog.Infof("Current Version is: %d of job: %s/%s", job.Status.Version, job.Namespace, job.Name) job, err := cc.initJobStatus(job) if err != nil { cc.recorder.Event(job, v1.EventTypeWarning, string(batch.JobStatusError), @@ -185,9 +189,46 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt return nil } - var err error - if job, err = cc.createJob(job); err != nil { - return err + // Skip job initiation if job is already accepted + if job.Status.State.Phase == "" { + var err error + if job, err = cc.initiateJob(job); err != nil { + return err + } + } + + var syncTask bool + if pg, _ := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); pg != nil { + if pg.Status.Phase != "" && pg.Status.Phase != scheduling.PodGroupPending { + syncTask = true + } + } + + if !syncTask { + if updateStatus != nil { + if updateStatus(&job.Status) { + job.Status.State.LastTransitionTime = metav1.Now() + } + } + newJob, err := cc.vcClient.BatchV1alpha1().Jobs(job.Namespace).UpdateStatus(job) + if err != nil { + klog.Errorf("Failed to update status of Job %v/%v: %v", + job.Namespace, job.Name, err) + return err + } + if e := cc.cache.Update(newJob); e != nil { + klog.Errorf("SyncJob - Failed to update Job %v/%v in cache: %v", + newJob.Namespace, newJob.Name, e) + return e + } + return nil + } + + // Skip job task sync if it is pending + if job.Status.State.Phase == batch.Pending { + klog.Infof("Job <%s/%s> is pending, skip pod sync.", + job.Namespace, job.Name) + return nil } var running, pending, terminating, succeeded, failed, unknown int32 @@ -297,7 +338,6 @@ func (cc *Controller) syncJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt fmt.Sprintf("Error deleting pods: %+v", deletionErrs)) return fmt.Errorf("failed to delete %d pods of %d", len(deletionErrs), len(podToDelete)) } - job.Status = batch.JobStatus{ State: job.Status.State, @@ -422,7 +462,7 @@ func (cc *Controller) createPVC(job *batch.Job, vcName string, volumeClaim *v1.P func (cc *Controller) createPodGroupIfNotExist(job *batch.Job) error { // If PodGroup does not exist, create one for Job. - if _, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil { + if pg, err := cc.pgLister.PodGroups(job.Namespace).Get(job.Name); err != nil { if !apierrors.IsNotFound(err) { klog.V(3).Infof("Failed to get PodGroup for Job <%s/%s>: %v", job.Namespace, job.Name, err)