Skip to content

Commit

Permalink
retain pod with special phase
Browse files Browse the repository at this point in the history
  • Loading branch information
lminzhw committed May 16, 2019
1 parent 51ee86c commit a72a103
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 34 deletions.
35 changes: 21 additions & 14 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"volcano.sh/volcano/pkg/controllers/job/state"
)

func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
glog.V(3).Infof("Killing Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name)

Expand All @@ -62,20 +62,27 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
continue
}

if err := cc.deleteJobPod(job.Name, pod); err == nil {
terminating++
} else {
errs = append(errs, err)
switch pod.Status.Phase {
case v1.PodRunning:
running++
case v1.PodPending:
pending++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
_, retain := podRetainPhase[pod.Status.Phase]

if !retain {
err := cc.deleteJobPod(job.Name, pod)
if err == nil {
terminating++
continue
}
// record the err, and then collect the pod info like retained pod
errs = append(errs, err)
}

switch pod.Status.Phase {
case v1.PodRunning:
running++
case v1.PodPending:
pending++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/job/state/aborted.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ type abortedState struct {
func (as *abortedState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
return KillJob(as.job, func(status *vkv1.JobStatus) bool {
return KillJob(as.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
status.State.Phase = vkv1.Restarting
status.RetryCount++
return true
})
default:
return KillJob(as.job, nil)
return KillJob(as.job, PodRetainPhaseSoft, nil)
}
}
4 changes: 2 additions & 2 deletions pkg/controllers/job/state/aborting.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (ps *abortingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
// Already in Restarting phase, just sync it
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
status.RetryCount++
return false
})
default:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
// If any "alive" pods, still in Aborting phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return false
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/completing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type completingState struct {
}

func (ps *completingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
// If any "alive" pods, still in Completing phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return false
Expand Down
14 changes: 12 additions & 2 deletions pkg/controllers/job/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@ limitations under the License.
package state

import (
"k8s.io/api/core/v1"

vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
)

type PhaseMap map[v1.PodPhase]struct{}
type UpdateStatusFn func(status *vkv1.JobStatus) (jobPhaseChanged bool)
type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error
type KillActionFn func(job *apis.JobInfo, podRetainPhase PhaseMap, fn UpdateStatusFn) error

var PodRetainPhaseNone = PhaseMap{}
var PodRetainPhaseSoft = PhaseMap{
v1.PodSucceeded: {},
v1.PodFailed: {},
}

var (
// SyncJob will create or delete Pods according to Job's spec.
SyncJob ActionFn
// KillJob kill all Pods of Job.
KillJob ActionFn
// KillJob kill all Pods of Job with phase not in podRetainPhase.
KillJob KillActionFn
// CreateJob will prepare to create Job.
CreateJob ActionFn
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/finished.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ type finishedState struct {

func (ps *finishedState) Execute(action vkv1.Action) error {
// In finished state, e.g. Completed, always kill the whole job.
return KillJob(ps.job, nil)
return KillJob(ps.job, PodRetainPhaseSoft, nil)
}
6 changes: 3 additions & 3 deletions pkg/controllers/job/state/inqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type inqueueState struct {
func (ps *inqueueState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Restarting
Expand All @@ -39,7 +39,7 @@ func (ps *inqueueState) Execute(action vkv1.Action) error {
})

case vkv1.AbortJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Aborting
Expand All @@ -48,7 +48,7 @@ func (ps *inqueueState) Execute(action vkv1.Action) error {
return true
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/job/state/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type pendingState struct {
func (ps *pendingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Restarting
Expand All @@ -39,7 +39,7 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
})

case vkv1.AbortJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Aborting
Expand All @@ -48,7 +48,7 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
return true
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type restartingState struct {
}

func (ps *restartingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
// Get the maximum number of retries.
maxRetry := DefaultMaxRetry
if ps.job.Job.Spec.MaxRetry != 0 {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type runningState struct {
func (ps *runningState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
if status.Terminating != 0 {
status.State.Phase = vkv1.Restarting
status.RetryCount++
Expand All @@ -37,7 +37,7 @@ func (ps *runningState) Execute(action vkv1.Action) error {
return false
})
case vkv1.AbortJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
if status.Terminating != 0 {
status.State.Phase = vkv1.Aborting
return true
Expand All @@ -46,7 +46,7 @@ func (ps *runningState) Execute(action vkv1.Action) error {
return false
})
case vkv1.TerminateJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
if status.Terminating != 0 {
status.State.Phase = vkv1.Terminating
return true
Expand All @@ -55,7 +55,7 @@ func (ps *runningState) Execute(action vkv1.Action) error {
return false
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/terminating.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type terminatingState struct {
}

func (ps *terminatingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
// If any "alive" pods, still in Terminating phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return false
Expand Down

0 comments on commit a72a103

Please sign in to comment.