Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

retain pod with special phase #176

Merged
merged 1 commit into from
May 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 21 additions & 14 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"volcano.sh/volcano/pkg/controllers/job/state"
)

func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateStatusFn) error {
func (cc *Controller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error {
glog.V(3).Infof("Killing Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name)
defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name)

Expand All @@ -62,20 +62,27 @@ func (cc *Controller) killJob(jobInfo *apis.JobInfo, updateStatus state.UpdateSt
continue
}

if err := cc.deleteJobPod(job.Name, pod); err == nil {
terminating++
} else {
errs = append(errs, err)
switch pod.Status.Phase {
case v1.PodRunning:
running++
case v1.PodPending:
pending++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
_, retain := podRetainPhase[pod.Status.Phase]

if !retain {
err := cc.deleteJobPod(job.Name, pod)
if err == nil {
terminating++
continue
}
// record the err, and then collect the pod info like retained pod
errs = append(errs, err)
}

switch pod.Status.Phase {
case v1.PodRunning:
running++
case v1.PodPending:
pending++
case v1.PodSucceeded:
succeeded++
case v1.PodFailed:
failed++
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controllers/job/state/aborted.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ type abortedState struct {
func (as *abortedState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
return KillJob(as.job, func(status *vkv1.JobStatus) bool {
return KillJob(as.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
status.State.Phase = vkv1.Restarting
status.RetryCount++
return true
})
default:
return KillJob(as.job, nil)
return KillJob(as.job, PodRetainPhaseSoft, nil)
}
}
4 changes: 2 additions & 2 deletions pkg/controllers/job/state/aborting.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ func (ps *abortingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.ResumeJobAction:
// Already in Restarting phase, just sync it
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
status.RetryCount++
return false
})
default:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
// If any "alive" pods, still in Aborting phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return false
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/completing.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type completingState struct {
}

func (ps *completingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
// If any "alive" pods, still in Completing phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return false
Expand Down
14 changes: 12 additions & 2 deletions pkg/controllers/job/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,28 @@ limitations under the License.
package state

import (
"k8s.io/api/core/v1"

vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
)

type PhaseMap map[v1.PodPhase]struct{}
type UpdateStatusFn func(status *vkv1.JobStatus) (jobPhaseChanged bool)
type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error
type KillActionFn func(job *apis.JobInfo, podRetainPhase PhaseMap, fn UpdateStatusFn) error

var PodRetainPhaseNone = PhaseMap{}
var PodRetainPhaseSoft = PhaseMap{
v1.PodSucceeded: {},
v1.PodFailed: {},
}

var (
// SyncJob will create or delete Pods according to Job's spec.
SyncJob ActionFn
// KillJob kill all Pods of Job.
KillJob ActionFn
// KillJob kill all Pods of Job with phase not in podRetainPhase.
KillJob KillActionFn
// CreateJob will prepare to create Job.
CreateJob ActionFn
)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/finished.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ type finishedState struct {

func (ps *finishedState) Execute(action vkv1.Action) error {
// In finished state, e.g. Completed, always kill the whole job.
return KillJob(ps.job, nil)
return KillJob(ps.job, PodRetainPhaseSoft, nil)
}
6 changes: 3 additions & 3 deletions pkg/controllers/job/state/inqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type inqueueState struct {
func (ps *inqueueState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Restarting
Expand All @@ -39,7 +39,7 @@ func (ps *inqueueState) Execute(action vkv1.Action) error {
})

case vkv1.AbortJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Aborting
Expand All @@ -48,7 +48,7 @@ func (ps *inqueueState) Execute(action vkv1.Action) error {
return true
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
Expand Down
6 changes: 3 additions & 3 deletions pkg/controllers/job/state/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type pendingState struct {
func (ps *pendingState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Restarting
Expand All @@ -39,7 +39,7 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
})

case vkv1.AbortJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Pending
if status.Terminating != 0 {
phase = vkv1.Aborting
Expand All @@ -48,7 +48,7 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
return true
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/restarting.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type restartingState struct {
}

func (ps *restartingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
// Get the maximum number of retries.
maxRetry := DefaultMaxRetry
if ps.job.Job.Spec.MaxRetry != 0 {
Expand Down
8 changes: 4 additions & 4 deletions pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type runningState struct {
func (ps *runningState) Execute(action vkv1.Action) error {
switch action {
case vkv1.RestartJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseNone, func(status *vkv1.JobStatus) bool {
if status.Terminating != 0 {
status.State.Phase = vkv1.Restarting
status.RetryCount++
Expand All @@ -37,7 +37,7 @@ func (ps *runningState) Execute(action vkv1.Action) error {
return false
})
case vkv1.AbortJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
if status.Terminating != 0 {
status.State.Phase = vkv1.Aborting
return true
Expand All @@ -46,7 +46,7 @@ func (ps *runningState) Execute(action vkv1.Action) error {
return false
})
case vkv1.TerminateJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
if status.Terminating != 0 {
status.State.Phase = vkv1.Terminating
return true
Expand All @@ -55,7 +55,7 @@ func (ps *runningState) Execute(action vkv1.Action) error {
return false
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/state/terminating.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type terminatingState struct {
}

func (ps *terminatingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status *vkv1.JobStatus) bool {
return KillJob(ps.job, PodRetainPhaseSoft, func(status *vkv1.JobStatus) bool {
// If any "alive" pods, still in Terminating phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return false
Expand Down