Skip to content

Commit

Permalink
Merge pull request #36 from volcano-sh/feature/support_task_level_lif…
Browse files Browse the repository at this point in the history
…ecycle

Support TaskComplete in LifecyclePolicy
  • Loading branch information
Klaus Ma authored Mar 24, 2019
2 parents 8a8f448 + 4e0bf83 commit fc95697
Show file tree
Hide file tree
Showing 10 changed files with 166 additions and 5 deletions.
6 changes: 6 additions & 0 deletions pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ const (
OutOfSyncEvent Event = "OutOfSync"
// CommandIssuedEvent is triggered if a command is raised by user
CommandIssuedEvent Event = "CommandIssued"
// TaskCompletedEvent is triggered if the 'Replicas' amount of pods in one task are succeed
TaskCompletedEvent Event = "TaskCompleted"
)

// Action is the action that Job controller will take according to the event.
Expand All @@ -114,6 +116,8 @@ const (
// TerminateJobAction if this action is set, the whole job wil be terminated
// and can not be resumed: all Pod of Job will be evicted, and no Pod will be recreated.
TerminateJobAction Action = "TerminateJob"
//CompleteJobAction if this action is set, the unfinished pods will be killed, job completed.
CompleteJobAction Action = "CompleteJob"

// ResumeJobAction is the action to resume an aborted job.
ResumeJobAction Action = "ResumeJob"
Expand Down Expand Up @@ -170,6 +174,8 @@ const (
Running JobPhase = "Running"
// Restarting is the phase that the Job is restarted, waiting for pod releasing and recreating
Restarting JobPhase = "Restarting"
// Completing is the phase that required tasks of job are completed, job starts to clean up
Completing JobPhase = "Completing"
// Completed is the phase that all tasks of Job are completed
Completed JobPhase = "Completed"
// Terminating is the phase that the Job is terminated, waiting for releasing pods
Expand Down
31 changes: 31 additions & 0 deletions pkg/controllers/job/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,37 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) {
wait.Until(jc.processCleanupJob, 0, stopCh)
}

func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
var taskReplicas, completed int32

jobInfo, found := jc.jobs[jobKey]
if !found {
return false
}

taskPods, found := jobInfo.Pods[taskName]

if !found {
return false
}

for _, task := range jobInfo.Job.Spec.Tasks {
if task.Name == taskName {
taskReplicas = task.Replicas
}
}
if taskReplicas <= 0 {
return false
}

for _, pod := range taskPods {
if pod.Status.Phase == v1.PodSucceeded {
completed += 1
}
}
return completed >= taskReplicas
}

func (jc *jobCache) processCleanupJob() {
obj, shutdown := jc.deletedJobs.Get()
if shutdown {
Expand Down
2 changes: 2 additions & 0 deletions pkg/controllers/job/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ type Cache interface {
AddPod(pod *v1.Pod) error
UpdatePod(pod *v1.Pod) error
DeletePod(pod *v1.Pod) error

TaskCompleted(jobKey, taskName string) bool
}
17 changes: 12 additions & 5 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,24 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
return
}

if err := cc.cache.UpdatePod(newPod); err != nil {
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
newPod.Namespace, newPod.Name, err)
}

event := vkbatchv1.OutOfSyncEvent
if oldPod.Status.Phase != v1.PodFailed &&
newPod.Status.Phase == v1.PodFailed {
event = vkbatchv1.PodFailedEvent
}

if oldPod.Status.Phase != v1.PodSucceeded &&
newPod.Status.Phase == v1.PodSucceeded {
if cc.cache.TaskCompleted(vkcache.JobKeyByName(newPod.Namespace, jobName), taskName) {
event = vkbatchv1.TaskCompletedEvent
}
}

req := apis.Request{
Namespace: newPod.Namespace,
JobName: jobName,
Expand All @@ -212,11 +224,6 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) {
JobVersion: int32(dVersion),
}

if err := cc.cache.UpdatePod(newPod); err != nil {
glog.Errorf("Failed to update Pod <%s/%s>: %v in cache",
newPod.Namespace, newPod.Name, err)
}

cc.queue.Add(req)
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/controllers/job/state/completing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package state

import (
vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1"
"volcano.sh/volcano/pkg/controllers/job/apis"
)

type completingState struct {
job *apis.JobInfo
}

func (ps *completingState) Execute(action vkv1.Action) error {
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
// If any "alive" pods, still in Completing phase
if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 {
return vkv1.JobState{
Phase: vkv1.Completing,
}
}

return vkv1.JobState{
Phase: vkv1.Completed,
}
})
}
2 changes: 2 additions & 0 deletions pkg/controllers/job/state/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ func NewState(jobInfo *apis.JobInfo) State {
return &abortingState{job: jobInfo}
case vkv1.Aborted:
return &abortedState{job: jobInfo}
case vkv1.Completing:
return &completingState{job: jobInfo}
}

// It's pending by default.
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/job/state/pending.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,17 @@ func (ps *pendingState) Execute(action vkv1.Action) error {
phase = vkv1.Aborting
}

return vkv1.JobState{
Phase: phase,
}
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
}

return vkv1.JobState{
Phase: phase,
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/controllers/job/state/running.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ func (ps *runningState) Execute(action vkv1.Action) error {
phase = vkv1.Terminating
}

return vkv1.JobState{
Phase: phase,
}
})
case vkv1.CompleteJobAction:
return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState {
phase := vkv1.Completed
if status.Terminating != 0 {
phase = vkv1.Completing
}

return vkv1.JobState{
Phase: phase,
}
Expand Down
40 changes: 40 additions & 0 deletions test/e2e/job_error_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,44 @@ var _ = Describe("Job Error Handling", func() {
Expect(err).NotTo(HaveOccurred())
})

It("job level LifecyclePolicy, Event: TaskCompleted; Action: CompletedJob", func() {
By("init test context")
context := initTestContext()
defer cleanupTestContext(context)

By("create job")
job := createJob(context, &jobSpec{
name: "any-restart-job",
policies: []vkv1.LifecyclePolicy{
{
Action: vkv1.CompleteJobAction,
Event: vkv1.TaskCompletedEvent,
},
},
tasks: []taskSpec{
{
name: "completed-task",
img: defaultBusyBoxImage,
min: 2,
rep: 2,
//Sleep 5 seconds ensure job in running state
command: "sleep 5",
},
{
name: "terminating-task",
img: defaultNginxImage,
min: 2,
rep: 2,
},
},
})

By("job scheduled, then task 'completed_task' finished and job finally complete")
// job phase: pending -> running -> completing -> completed
err := waitJobStates(context, job, []vkv1.JobPhase{
vkv1.Pending, vkv1.Running, vkv1.Completing, vkv1.Completed})
Expect(err).NotTo(HaveOccurred())

})

})
10 changes: 10 additions & 0 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,16 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error {
return nil
}

func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error {
for _, phase := range phases {
err := wait.Poll(100*time.Millisecond, oneMinute, jobPhaseExpect(ctx, job, phase))
if err != nil {
return err
}
}
return nil
}

func waitJobPhase(ctx *context, job *vkv1.Job, phase vkv1.JobPhase) error {
return wait.Poll(100*time.Millisecond, twoMinute, func() (bool, error) {
newJob, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})
Expand Down

0 comments on commit fc95697

Please sign in to comment.