From a72a1032a2754c3854800ea6101b614e844b91d7 Mon Sep 17 00:00:00 2001 From: lminzhw Date: Thu, 16 May 2019 16:51:47 +0800 Subject: [PATCH] retain pod with special phase --- pkg/controllers/job/job_controller_actions.go | 35 +++++++++++-------- pkg/controllers/job/state/aborted.go | 4 +-- pkg/controllers/job/state/aborting.go | 4 +-- pkg/controllers/job/state/completing.go | 2 +- pkg/controllers/job/state/factory.go | 14 ++++++-- pkg/controllers/job/state/finished.go | 2 +- pkg/controllers/job/state/inqueue.go | 6 ++-- pkg/controllers/job/state/pending.go | 6 ++-- pkg/controllers/job/state/restarting.go | 2 +- pkg/controllers/job/state/running.go | 8 ++--- pkg/controllers/job/state/terminating.go | 2 +- 11 files changed, 51 insertions(+), 34 deletions(-) diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index a88382761fb..8d2bc0200ab 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -35,7 +35,7 @@ import ( "volcano.sh/volcano/pkg/controllers/job/state" ) -func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error { +func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error { glog.V(3).Infof("Killing Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name) @@ -62,20 +62,27 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt continue } - if err := cc.deleteJobPod(job.Name, pod); err == nil { - terminating++ - } else { - errs = append(errs, err) - switch pod.Status.Phase { - case v1.PodRunning: - running++ - case v1.PodPending: - pending++ - case v1.PodSucceeded: - succeeded++ - case v1.PodFailed: - failed++ + _, retain := podRetainPhase[pod.Status.Phase] + + if !retain { + err := cc.deleteJobPod(job.Name, pod) + if err == nil { + terminating++ + continue } + // record the err, and then collect the pod info like retained pod + errs = append(errs, err) + } + + switch pod.Status.Phase { + case v1.PodRunning: + running++ + case v1.PodPending: + pending++ + case v1.PodSucceeded: + succeeded++ + case v1.PodFailed: + failed++ } } } diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index 953ee392d55..3fa6c5c8bdf 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -28,12 +28,12 @@ type abortedState struct { func (as *abortedState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: - return KillJob(as.job, func(status *vkv1.JobStatus) bool { + return KillJob(as.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { status.State.Phase = vkv1.Restarting status.RetryCount++ return true }) default: - return KillJob(as.job, nil) + return KillJob(as.job, PodRetainPhaseSoft, nil) } } diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index 8d123b6ada5..ef40326507b 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -31,12 +31,12 @@ func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: // Already in Restarting phase, just sync it - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { status.RetryCount++ return false }) default: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { // If any "alive" pods, still in Aborting phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { return false diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go index cb9f7074bd0..acad88e88a1 100644 --- a/pkg/controllers/job/state/completing.go +++ b/pkg/controllers/job/state/completing.go @@ -26,7 +26,7 @@ type completingState struct { } func (ps *completingState) Execute(action vkv1.Action) error { - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { // If any "alive" pods, still in Completing phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { return false diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index f33cb753f76..e3e30a582d9 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -17,18 +17,28 @@ limitations under the License. package state import ( + "k8s.io/api/core/v1" + vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) +type PhaseMap map[v1.PodPhase]struct{} type UpdateStatusFn func(status *vkv1.JobStatus) (jobPhaseChanged bool) type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error +type KillActionFn func(job *apis.JobInfo, podRetainPhase PhaseMap, fn UpdateStatusFn) error + +var PodRetainPhaseNone = PhaseMap{} +var PodRetainPhaseSoft = PhaseMap{ + v1.PodSucceeded: {}, + v1.PodFailed: {}, +} var ( // SyncJob will create or delete Pods according to Job's spec. SyncJob ActionFn - // KillJob kill all Pods of Job. - KillJob ActionFn + // KillJob kill all Pods of Job with phase not in podRetainPhase. + KillJob KillActionFn // CreateJob will prepare to create Job. CreateJob ActionFn ) diff --git a/pkg/controllers/job/state/finished.go b/pkg/controllers/job/state/finished.go index 52480bd95d3..d39671fe6b5 100644 --- a/pkg/controllers/job/state/finished.go +++ b/pkg/controllers/job/state/finished.go @@ -27,5 +27,5 @@ type finishedState struct { func (ps *finishedState) Execute(action vkv1.Action) error { // In finished state, e.g. Completed, always kill the whole job. - return KillJob(ps.job, nil) + return KillJob(ps.job, PodRetainPhaseSoft, nil) } diff --git a/pkg/controllers/job/state/inqueue.go b/pkg/controllers/job/state/inqueue.go index 44cbfa514f0..981aeb40e59 100644 --- a/pkg/controllers/job/state/inqueue.go +++ b/pkg/controllers/job/state/inqueue.go @@ -28,7 +28,7 @@ type inqueueState struct { func (ps *inqueueState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Restarting @@ -39,7 +39,7 @@ func (ps *inqueueState) Execute(action vkv1.Action) error { }) case vkv1.AbortJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Aborting @@ -48,7 +48,7 @@ func (ps *inqueueState) Execute(action vkv1.Action) error { return true }) case vkv1.CompleteJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { phase := vkv1.Completed if status.Terminating != 0 { phase = vkv1.Completing diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index 9cead263a57..38fa2e08a43 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -28,7 +28,7 @@ type pendingState struct { func (ps *pendingState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Restarting @@ -39,7 +39,7 @@ func (ps *pendingState) Execute(action vkv1.Action) error { }) case vkv1.AbortJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { phase := vkv1.Pending if status.Terminating != 0 { phase = vkv1.Aborting @@ -48,7 +48,7 @@ func (ps *pendingState) Execute(action vkv1.Action) error { return true }) case vkv1.CompleteJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { phase := vkv1.Completed if status.Terminating != 0 { phase = vkv1.Completing diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index b11fb460a23..df83503b838 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -26,7 +26,7 @@ type restartingState struct { } func (ps *restartingState) Execute(action vkv1.Action) error { - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { // Get the maximum number of retries. maxRetry := DefaultMaxRetry if ps.job.Job.Spec.MaxRetry != 0 { diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index e19fecc151b..e25b3a77af7 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -28,7 +28,7 @@ type runningState struct { func (ps *runningState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool { if status.Terminating != 0 { status.State.Phase = vkv1.Restarting status.RetryCount++ @@ -37,7 +37,7 @@ func (ps *runningState) Execute(action vkv1.Action) error { return false }) case vkv1.AbortJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { if status.Terminating != 0 { status.State.Phase = vkv1.Aborting return true @@ -46,7 +46,7 @@ func (ps *runningState) Execute(action vkv1.Action) error { return false }) case vkv1.TerminateJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { if status.Terminating != 0 { status.State.Phase = vkv1.Terminating return true @@ -55,7 +55,7 @@ func (ps *runningState) Execute(action vkv1.Action) error { return false }) case vkv1.CompleteJobAction: - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { phase := vkv1.Completed if status.Terminating != 0 { phase = vkv1.Completing diff --git a/pkg/controllers/job/state/terminating.go b/pkg/controllers/job/state/terminating.go index 5ceddecb785..b46c2429d60 100644 --- a/pkg/controllers/job/state/terminating.go +++ b/pkg/controllers/job/state/terminating.go @@ -26,7 +26,7 @@ type terminatingState struct { } func (ps *terminatingState) Execute(action vkv1.Action) error { - return KillJob(ps.job, func(status *vkv1.JobStatus) bool { + return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool { // If any "alive" pods, still in Terminating phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { return false