diff --git a/pkg/controllers/apis/request.go b/pkg/controllers/apis/request.go index 0e7454c2d9..08e82e37be 100644 --- a/pkg/controllers/apis/request.go +++ b/pkg/controllers/apis/request.go @@ -32,6 +32,7 @@ type Request struct { JobUid types.UID TaskName string QueueName string + PodName string Event v1alpha1.Event ExitCode int32 @@ -42,8 +43,8 @@ type Request struct { // String function returns the request in string format. func (r Request) String() string { return fmt.Sprintf( - "Queue: %s, Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d", - r.QueueName, r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion) + "Queue: %s, Job: %s/%s, Task:%s, Pod:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d", + r.QueueName, r.Namespace, r.JobName, r.TaskName, r.PodName, r.Event, r.ExitCode, r.Action, r.JobVersion) } // FlowRequest The object of sync operation, used for JobFlow and JobTemplate diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index e0a3275fbc..87b268816c 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -17,9 +17,11 @@ limitations under the License. package job import ( + "context" "fmt" "hash" "hash/fnv" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -60,6 +62,29 @@ func init() { framework.RegisterController(&jobcontroller{}) } +type delayAction struct { + // The namespacing name of the job + jobKey string + + // The name of the task + taskName string + + // The name of the pod + podName string + + // The event caused the action + event busv1alpha1.Event + + // The action to take. + action busv1alpha1.Action + + // The delay before the action is executed + delay time.Duration + + // The cancel function of the action + cancel context.CancelFunc +} + // jobcontroller the Job jobcontroller type. type jobcontroller struct { kubeClient kubernetes.Interface @@ -106,15 +131,20 @@ type jobcontroller struct { queueSynced func() bool // queue that need to sync up - queueList []workqueue.RateLimitingInterface - commandQueue workqueue.RateLimitingInterface + queueList []workqueue.TypedRateLimitingInterface[any] + commandQueue workqueue.TypedRateLimitingInterface[any] cache jobcache.Cache // Job Event recorder recorder record.EventRecorder - errTasks workqueue.RateLimitingInterface + errTasks workqueue.TypedRateLimitingInterface[any] workers uint32 maxRequeueNum int + + delayActionMapLock sync.RWMutex + // delayActionMap stores delayed actions for jobs, where outer map key is job key (namespace/name), + // inner map key is pod name, and value is the delayed action to be performed + delayActionMap map[string]map[string]*delayAction } func (cc *jobcontroller) Name() string { @@ -135,8 +165,8 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"}) cc.informerFactory = sharedInformers - cc.queueList = make([]workqueue.RateLimitingInterface, workers) - cc.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + cc.queueList = make([]workqueue.TypedRateLimitingInterface[any], workers) + cc.commandQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) cc.cache = jobcache.New() cc.errTasks = newRateLimitingQueue() cc.recorder = recorder @@ -148,7 +178,7 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { var i uint32 for i = 0; i < workers; i++ { - cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + cc.queueList[i] = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]()) } factory := opt.VCSharedInformerFactory @@ -226,10 +256,12 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error { cc.queueLister = cc.queueInformer.Lister() cc.queueSynced = cc.queueInformer.Informer().HasSynced + cc.delayActionMap = make(map[string]map[string]*delayAction) + // Register actions state.SyncJob = cc.syncJob state.KillJob = cc.killJob - + state.KillTarget = cc.killTarget return nil } @@ -327,6 +359,37 @@ func (cc *jobcontroller) processNextReq(count uint32) bool { klog.V(3).Infof("Try to handle request <%v>", req) + // Handle cancellation logic for delayed actions: + // 1. When request event is not Pod Pending: + // - Cancel corresponding Pod Pending delayed action + // - When Pod enters Running state, cancel corresponding Pod Failed and Pod Evicted delayed actions + if req.Event != busv1alpha1.PodPendingEvent { + cc.delayActionMapLock.Lock() + if taskMap, exists := cc.delayActionMap[key]; exists { + for podName, delayAct := range taskMap { + shouldCancel := false + + if podName == req.PodName { + if delayAct.event == busv1alpha1.PodPendingEvent { + shouldCancel = true + } + + if req.Event == busv1alpha1.PodRunningEvent && + (delayAct.event == busv1alpha1.PodFailedEvent || delayAct.event == busv1alpha1.PodEvictedEvent) { + shouldCancel = true + } + } + + if shouldCancel { + klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> of Job <%s>", delayAct.action, podName, delayAct.jobKey) + delayAct.cancel() + delete(taskMap, podName) + } + } + } + cc.delayActionMapLock.Unlock() + } + jobInfo, err := cc.cache.Get(key) if err != nil { // TODO(k82cn): ignore not-ready error. @@ -341,34 +404,125 @@ func (cc *jobcontroller) processNextReq(count uint32) bool { return true } - action := applyPolicies(jobInfo.Job, &req) + delayAct := applyPolicies(jobInfo.Job, &req) + + if delayAct.delay != 0 { + klog.V(3).Infof("Execute <%v> on Job <%s/%s> after %s", + delayAct.action, req.Namespace, req.JobName, delayAct.delay.String()) + cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf( + "Execute action %s after %s", delayAct.action, delayAct.delay.String())) + cc.AddDelayActionForJob(req, delayAct) + return true + } + klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.", - action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st) + delayAct.action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st) - if action != busv1alpha1.SyncJobAction { + if delayAct.action != busv1alpha1.SyncJobAction { cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf( - "Start to execute action %s ", action)) + "Start to execute action %s ", delayAct.action)) } + action := GetStateAction(delayAct) + if err := st.Execute(action); err != nil { - if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum { - klog.V(2).Infof("Failed to handle Job <%s/%s>: %v", - jobInfo.Job.Namespace, jobInfo.Job.Name, err) - // If any error, requeue it. - queue.AddRateLimited(req) - return true - } - cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf( - "Job failed on action %s for retry limit reached", action)) - klog.Warningf("Terminating Job <%s/%s> and releasing resources", jobInfo.Job.Namespace, jobInfo.Job.Name) - if err = st.Execute(busv1alpha1.TerminateJobAction); err != nil { - klog.Errorf("Failed to terminate Job<%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err) - } - klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", jobInfo.Job.Namespace, jobInfo.Job.Name, err) + cc.handleJobError(queue, req, st, err, delayAct.action) } // If no error, forget it. queue.Forget(req) + // If the action is not an internal action, cancel all delayed actions + if !isInternalAction(delayAct.action) { + cc.cleanupDelayActions(delayAct.jobKey, "") + } + return true } + +func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayAction) { + cc.delayActionMapLock.Lock() + defer cc.delayActionMapLock.Unlock() + + m, ok := cc.delayActionMap[delayAct.jobKey] + if !ok { + m = make(map[string]*delayAction) + cc.delayActionMap[delayAct.jobKey] = m + } + if oldDelayAct, exists := m[req.PodName]; exists && oldDelayAct.action == delayAct.action { + return + } + m[req.PodName] = delayAct + + ctx, cancel := context.WithTimeout(context.Background(), delayAct.delay) + delayAct.cancel = cancel + + go func() { + <-ctx.Done() + if ctx.Err() == context.Canceled { + klog.V(4).Infof("Job<%s/%s>'s delayed action %s is canceled", req.Namespace, req.JobName, delayAct.action) + return + } + + klog.V(4).Infof("Job<%s/%s>'s delayed action %s is expired, execute it", req.Namespace, req.JobName, delayAct.action) + + jobInfo, err := cc.cache.Get(delayAct.jobKey) + if err != nil { + klog.Errorf("Failed to get job by <%v> from cache: %v", req, err) + return + } + + st := state.NewState(jobInfo) + if st == nil { + klog.Errorf("Invalid state <%s> of Job <%v/%v>", + jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name) + return + } + queue := cc.getWorkerQueue(delayAct.jobKey) + + if err := st.Execute(GetStateAction(delayAct)); err != nil { + cc.handleJobError(queue, req, st, err, delayAct.action) + } + + queue.Forget(req) + + cc.cleanupDelayActions(delayAct.jobKey, delayAct.podName) + }() + +} + +func (cc *jobcontroller) handleJobError(queue workqueue.TypedRateLimitingInterface[any], req apis.Request, st state.State, err error, action busv1alpha1.Action) { + if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum { + klog.V(2).Infof("Failed to handle Job <%s/%s>: %v", + req.Namespace, req.JobName, err) + queue.AddRateLimited(req) + return + } + + cc.recordJobEvent(req.Namespace, req.JobName, batchv1alpha1.ExecuteAction, + fmt.Sprintf("Job failed on action %s for retry limit reached", action)) + klog.Warningf("Terminating Job <%s/%s> and releasing resources", req.Namespace, req.JobName) + + if err = st.Execute(state.Action{Action: busv1alpha1.TerminateJobAction}); err != nil { + klog.Errorf("Failed to terminate Job<%s/%s>: %v", req.Namespace, req.JobName, err) + } + klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", + req.Namespace, req.JobName, err) +} + +func (cc *jobcontroller) cleanupDelayActions(jobKey string, excludePod string) { + cc.delayActionMapLock.Lock() + defer cc.delayActionMapLock.Unlock() + + if m, exists := cc.delayActionMap[jobKey]; exists { + for podName, v := range m { + if podName == excludePod { + continue + } + if v.cancel != nil { + v.cancel() + } + } + cc.delayActionMap[jobKey] = make(map[string]*delayAction) + } +} diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index a9c6c3b95c..41c72d4fde 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -64,11 +64,26 @@ func (cc *jobcontroller) generateRelatedPodGroupName(job *batch.Job) string { return fmt.Sprintf("%s-%s", job.Name, string(job.UID)) } +func (cc *jobcontroller) killTarget(jobInfo *apis.JobInfo, target state.Target, updateStatus state.UpdateStatusFn) error { + if target.Type == state.TargetTypeTask { + klog.V(3).Infof("Killing task <%s> of Job <%s/%s>, current version %d", target.TaskName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) + defer klog.V(3).Infof("Finished task <%s> of Job <%s/%s> killing, current version %d", target.TaskName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) + } else if target.Type == state.TargetTypePod { + klog.V(3).Infof("Killing pod <%s> of Job <%s/%s>, current version %d", target.PodName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) + defer klog.V(3).Infof("Finished pod <%s> of Job <%s/%s> killing, current version %d", target.PodName, jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) + } + return cc.killPods(jobInfo, nil, &target, updateStatus) +} + func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, updateStatus state.UpdateStatusFn) error { - job := jobInfo.Job - klog.V(3).Infof("Killing Job <%s/%s>, current version %d", job.Namespace, job.Name, job.Status.Version) - defer klog.V(3).Infof("Finished Job <%s/%s> killing, current version %d", job.Namespace, job.Name, job.Status.Version) + klog.V(3).Infof("Killing Job <%s/%s>, current version %d", jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) + defer klog.V(3).Infof("Finished Job <%s/%s> killing, current version %d", jobInfo.Namespace, jobInfo.Name, jobInfo.Job.Status.Version) + + return cc.killPods(jobInfo, podRetainPhase, nil, updateStatus) +} +func (cc *jobcontroller) killPods(jobInfo *apis.JobInfo, podRetainPhase state.PhaseMap, target *state.Target, updateStatus state.UpdateStatusFn) error { + job := jobInfo.Job if job.DeletionTimestamp != nil { klog.Infof("Job <%s/%s> is terminating, skip management process.", job.Namespace, job.Name) @@ -81,44 +96,66 @@ func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.Pha var errs []error var total int - for _, pods := range jobInfo.Pods { - for _, pod := range pods { - total++ + podsToKill := make(map[string]*v1.Pod) - if pod.DeletionTimestamp != nil { - klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) - terminating++ - continue - } + if target != nil { + if target.Type == state.TargetTypeTask { + podsToKill = jobInfo.Pods[target.TaskName] + } else if target.Type == state.TargetTypePod { + podsToKill[target.PodName] = jobInfo.Pods[target.TaskName][target.PodName] + } + total += len(podsToKill) + } else { + // Job version is bumped only when job is killed + job.Status.Version++ + for _, pods := range jobInfo.Pods { + for _, pod := range pods { + total++ + if pod.DeletionTimestamp != nil { + klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) + terminating++ + continue + } - maxRetry := job.Spec.MaxRetry - lastRetry := false - if job.Status.RetryCount >= maxRetry-1 { - lastRetry = true - } + maxRetry := job.Spec.MaxRetry + lastRetry := false + if job.Status.RetryCount >= maxRetry-1 { + lastRetry = true + } - // Only retain the Failed and Succeeded pods at the last retry. - // If it is not the last retry, kill pod as defined in `podRetainPhase`. - retainPhase := podRetainPhase - if lastRetry { - retainPhase = state.PodRetainPhaseSoft - } - _, retain := retainPhase[pod.Status.Phase] + // Only retain the Failed and Succeeded pods at the last retry. + // If it is not the last retry, kill pod as defined in `podRetainPhase`. + retainPhase := podRetainPhase + if lastRetry { + retainPhase = state.PodRetainPhaseSoft + } + _, retain := retainPhase[pod.Status.Phase] - if !retain { - err := cc.deleteJobPod(job.Name, pod) - if err == nil { - terminating++ - continue + if !retain { + podsToKill[pod.Name] = pod } - // record the err, and then collect the pod info like retained pod - errs = append(errs, err) - cc.resyncTask(pod) } + } + } - classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown) - calcPodStatus(pod, taskStatusCount) + for _, pod := range podsToKill { + if pod.DeletionTimestamp != nil { + klog.Infof("Pod <%s/%s> is terminating", pod.Namespace, pod.Name) + terminating++ + continue } + + err := cc.deleteJobPod(job.Name, pod) + if err == nil { + terminating++ + continue + } + // record the err, and then collect the pod info like retained pod + errs = append(errs, err) + cc.resyncTask(pod) + + classifyAndAddUpPodBaseOnPhase(pod, &pending, &running, &succeeded, &failed, &unknown) + calcPodStatus(pod, taskStatusCount) } if len(errs) != 0 { @@ -129,8 +166,6 @@ func (cc *jobcontroller) killJob(jobInfo *apis.JobInfo, podRetainPhase state.Pha } job = job.DeepCopy() - // Job version is bumped only when job is killed - job.Status.Version++ job.Status.Pending = pending job.Status.Running = running job.Status.Succeeded = succeeded diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index b97a25be29..fc315e8cbc 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -184,8 +184,9 @@ func (cc *jobcontroller) addPod(obj interface{}) { Namespace: pod.Namespace, JobName: jobName, JobUid: jobUid, + PodName: pod.Name, - Event: bus.OutOfSyncEvent, + Event: bus.PodPendingEvent, JobVersion: int32(dVersion), } @@ -279,10 +280,20 @@ func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) { cc.cache.TaskCompleted(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) { event = bus.TaskCompletedEvent } - case v1.PodPending, v1.PodRunning: + case v1.PodRunning: if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) { event = bus.TaskFailedEvent } + if oldPod.Status.Phase != v1.PodRunning { + event = bus.PodRunningEvent + } + case v1.PodPending: + if cc.cache.TaskFailed(jobcache.JobKeyByName(newPod.Namespace, jobName), taskName) { + event = bus.TaskFailedEvent + } + if oldPod.Status.Phase != v1.PodPending { + event = bus.PodPendingEvent + } } req := apis.Request{ @@ -290,6 +301,7 @@ func (cc *jobcontroller) updatePod(oldObj, newObj interface{}) { JobName: jobName, JobUid: jobUid, TaskName: taskName, + PodName: newPod.Name, Event: event, ExitCode: exitCode, @@ -358,6 +370,7 @@ func (cc *jobcontroller) deletePod(obj interface{}) { JobName: jobName, JobUid: jobUid, TaskName: taskName, + PodName: pod.Name, Event: bus.PodEvictedEvent, JobVersion: int32(dVersion), diff --git a/pkg/controllers/job/job_controller_test.go b/pkg/controllers/job/job_controller_test.go new file mode 100644 index 0000000000..83cb25f242 --- /dev/null +++ b/pkg/controllers/job/job_controller_test.go @@ -0,0 +1,48 @@ +/* +Copyright 2017 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "testing" + "time" + + "k8s.io/client-go/util/workqueue" + "volcano.sh/apis/pkg/apis/bus/v1alpha1" +) + +func Test_jobcontroller_processScheduledAction(t *testing.T) { + + tests := []struct { + name string + }{ + + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + q := workqueue.TypedNewDelayingQueue[*delayAction]() + delayAction := &delayAction{ + jobKey: "test/test", + action: v1alpha1.SyncJobAction, + delay: 5 * time.Second, + } + q.AddAfter(delayAction, delayAction.delay) + + q.AddAfter(delayAction, 0) + }) + } +} diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 0c0e6f943b..2ac9c02eaf 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -32,7 +32,9 @@ import ( "volcano.sh/apis/pkg/apis/helpers" schedulingv2 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" "volcano.sh/volcano/pkg/controllers/apis" + jobcache "volcano.sh/volcano/pkg/controllers/cache" jobhelpers "volcano.sh/volcano/pkg/controllers/job/helpers" + "volcano.sh/volcano/pkg/controllers/job/state" "volcano.sh/volcano/pkg/controllers/util" ) @@ -165,13 +167,23 @@ func createJobPod(job *batch.Job, template *v1.PodTemplateSpec, topologyPolicy b return pod } -func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action { +func applyPolicies(job *batch.Job, req *apis.Request) (delayAct *delayAction) { + delayAct = &delayAction{ + jobKey: jobcache.JobKeyByReq(req), + event: req.Event, + taskName: req.TaskName, + podName: req.PodName, + // default action is sync job + action: v1alpha1.SyncJobAction, + } + if len(req.Action) != 0 { - return req.Action + delayAct.action = req.Action + return } if req.Event == v1alpha1.OutOfSyncEvent { - return v1alpha1.SyncJobAction + return } // Solve the scenario: When pod events accumulate and vcjobs with the same name are frequently created, @@ -179,13 +191,13 @@ func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action { if len(req.JobUid) != 0 && job != nil && req.JobUid != job.UID { klog.V(2).Infof("The req belongs to job(%s/%s) and job uid is %v, but the uid of job(%s/%s) is %v in cache, perform %v action", req.Namespace, req.JobName, req.JobUid, job.Namespace, job.Name, job.UID, v1alpha1.SyncJobAction) - return v1alpha1.SyncJobAction + return } // For all the requests triggered from discarded job resources will perform sync action instead if req.JobVersion < job.Status.Version { klog.Infof("Request %s is outdated, will perform sync instead.", req) - return v1alpha1.SyncJobAction + return } // Overwrite Job level policies @@ -198,13 +210,21 @@ func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action { if len(policyEvents) > 0 && len(req.Event) > 0 { if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) { - return policy.Action + delayAct.action = policy.Action + if policy.Timeout != nil { + delayAct.delay = policy.Timeout.Duration + } + return } } // 0 is not an error code, is prevented in validation admission controller if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode { - return policy.Action + delayAct.action = policy.Action + if policy.Timeout != nil { + delayAct.delay = policy.Timeout.Duration + } + return } } break @@ -218,17 +238,25 @@ func applyPolicies(job *batch.Job, req *apis.Request) v1alpha1.Action { if len(policyEvents) > 0 && len(req.Event) > 0 { if checkEventExist(policyEvents, req.Event) || checkEventExist(policyEvents, v1alpha1.AnyEvent) { - return policy.Action + delayAct.action = policy.Action + if policy.Timeout != nil { + delayAct.delay = policy.Timeout.Duration + } + return } } // 0 is not an error code, is prevented in validation admission controller if policy.ExitCode != nil && *policy.ExitCode == req.ExitCode { - return policy.Action + delayAct.action = policy.Action + if policy.Timeout != nil { + delayAct.delay = policy.Timeout.Duration + } + return } } - return v1alpha1.SyncJobAction + return } func getEventlist(policy batch.LifecyclePolicy) []v1alpha1.Event { @@ -356,3 +384,29 @@ func calTaskRequests(pod *v1.Pod, validReplica int32) v1.ResourceList { } return minReq } + +// isInternalAction checks if the action is an internal action +func isInternalAction(action v1alpha1.Action) bool { + switch action { + case v1alpha1.SyncJobAction, + v1alpha1.EnqueueAction, + v1alpha1.SyncQueueAction, + v1alpha1.OpenQueueAction, + v1alpha1.CloseQueueAction: + return true + default: + return false + } +} + +func GetStateAction(delayAct *delayAction) state.Action { + action := state.Action{Action: delayAct.action} + + if delayAct.action == v1alpha1.RestartTaskAction { + action.Target = state.Target{TaskName: delayAct.taskName, Type: state.TargetTypeTask} + } else if delayAct.action == v1alpha1.RestartPodAction { + action.Target = state.Target{TaskName: delayAct.taskName, PodName: delayAct.podName, Type: state.TargetTypePod} + } + + return action +} diff --git a/pkg/controllers/job/job_controller_util_test.go b/pkg/controllers/job/job_controller_util_test.go index 25f6b1d20f..db70bdfac7 100644 --- a/pkg/controllers/job/job_controller_util_test.go +++ b/pkg/controllers/job/job_controller_util_test.go @@ -617,7 +617,7 @@ func TestApplyPolicies(t *testing.T) { t.Run(testcase.Name, func(t *testing.T) { action := applyPolicies(testcase.Job, testcase.Request) - if testcase.ReturnVal != "" && action != "" && testcase.ReturnVal != action { + if testcase.ReturnVal != "" && action.action != "" && testcase.ReturnVal != action.action { t.Errorf("Expected return value to be %s but got %s in case %d", testcase.ReturnVal, action, i) } }) diff --git a/pkg/controllers/job/job_state_test.go b/pkg/controllers/job/job_state_test.go index 0eac372406..85bb041487 100644 --- a/pkg/controllers/job/job_state_test.go +++ b/pkg/controllers/job/job_state_test.go @@ -103,7 +103,7 @@ func TestAbortedState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = absState.Execute(testcase.Action) + err = absState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } @@ -218,7 +218,7 @@ func TestAbortingState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = absState.Execute(testcase.Action) + err = absState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } @@ -331,7 +331,7 @@ func TestCompletingState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = testState.Execute(testcase.Action) + err = testState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } @@ -404,7 +404,7 @@ func TestFinishedState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = testState.Execute(testcase.Action) + err = testState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } @@ -715,7 +715,7 @@ func TestPendingState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = testState.Execute(testcase.Action) + err = testState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } @@ -850,7 +850,7 @@ func TestRestartingState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = testState.Execute(testcase.Action) + err = testState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } @@ -1335,7 +1335,7 @@ func TestRunningState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = testState.Execute(testcase.Action) + err = testState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } @@ -1462,7 +1462,7 @@ func TestTerminatingState_Execute(t *testing.T) { t.Error("Error while adding Job in cache") } - err = testState.Execute(testcase.Action) + err = testState.Execute(state.Action{Action: testcase.Action}) if err != nil { t.Errorf("Expected Error not to occur but got: %s", err) } diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index 8d4795af47..b686783fff 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -26,8 +26,8 @@ type abortedState struct { job *apis.JobInfo } -func (as *abortedState) Execute(action v1alpha1.Action) error { - switch action { +func (as *abortedState) Execute(action Action) error { + switch action.Action { case v1alpha1.ResumeJobAction: return KillJob(as.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Restarting diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index 73c9623bb2..4b98ea593b 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -26,8 +26,8 @@ type abortingState struct { job *apis.JobInfo } -func (ps *abortingState) Execute(action v1alpha1.Action) error { - switch action { +func (ps *abortingState) Execute(action Action) error { + switch action.Action { case v1alpha1.ResumeJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Restarting diff --git a/pkg/controllers/job/state/completing.go b/pkg/controllers/job/state/completing.go index 1b7b102e2d..b385eea76f 100644 --- a/pkg/controllers/job/state/completing.go +++ b/pkg/controllers/job/state/completing.go @@ -20,7 +20,6 @@ import ( "fmt" vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" - "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -28,7 +27,7 @@ type completingState struct { job *apis.JobInfo } -func (ps *completingState) Execute(action v1alpha1.Action) error { +func (ps *completingState) Execute(action Action) error { return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { // If any "alive" pods, still in Completing phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index 022d8cef62..687d9079c2 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -36,6 +36,9 @@ type ActionFn func(job *apis.JobInfo, fn UpdateStatusFn) error // KillActionFn kill all Pods of Job with phase not in podRetainPhase. type KillActionFn func(job *apis.JobInfo, podRetainPhase PhaseMap, fn UpdateStatusFn) error +// KillPodFn kill the Task with given name. +type KillTargetFn func(job *apis.JobInfo, target Target, fn UpdateStatusFn) error + // PodRetainPhaseNone stores no phase. var PodRetainPhaseNone = PhaseMap{} @@ -50,12 +53,32 @@ var ( SyncJob ActionFn // KillJob kill all Pods of Job with phase not in podRetainPhase. KillJob KillActionFn + // KillTarget kill the target with given name. + KillTarget KillTargetFn +) + +type TargetType string + +const ( + TargetTypeTask TargetType = "task" + TargetTypePod TargetType = "pod" ) +type Target struct { + TaskName string + PodName string + Type TargetType +} + +type Action struct { + Action v1alpha1.Action + Target Target +} + // State interface. type State interface { // Execute executes the actions based on current state. - Execute(act v1alpha1.Action) error + Execute(act Action) error } // NewState gets the state from the volcano job Phase. diff --git a/pkg/controllers/job/state/finished.go b/pkg/controllers/job/state/finished.go index d39aa8967e..792246f623 100644 --- a/pkg/controllers/job/state/finished.go +++ b/pkg/controllers/job/state/finished.go @@ -17,7 +17,6 @@ limitations under the License. package state import ( - "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -25,7 +24,7 @@ type finishedState struct { job *apis.JobInfo } -func (ps *finishedState) Execute(action v1alpha1.Action) error { +func (ps *finishedState) Execute(action Action) error { // In finished state, e.g. Completed, always kill the whole job. return KillJob(ps.job, PodRetainPhaseSoft, nil) } diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index 637f900c94..a7de9712c3 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -26,15 +26,20 @@ type pendingState struct { job *apis.JobInfo } -func (ps *pendingState) Execute(action v1alpha1.Action) error { - switch action { +func (ps *pendingState) Execute(action Action) error { + switch action.Action { case v1alpha1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool { status.RetryCount++ status.State.Phase = vcbatch.Restarting return true }) - + case v1alpha1.RestartTaskAction, v1alpha1.RestartPodAction: + return KillTarget(ps.job, action.Target, func(status *vcbatch.JobStatus) bool { + status.RetryCount++ + status.State.Phase = vcbatch.Restarting + return true + }) case v1alpha1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Aborting diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index 7c7bc4404c..3b3ceb0c2f 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -28,27 +28,36 @@ type restartingState struct { job *apis.JobInfo } -func (ps *restartingState) Execute(action v1alpha1.Action) error { - return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool { - // Get the maximum number of retries. - maxRetry := ps.job.Job.Spec.MaxRetry - - if status.RetryCount >= maxRetry { - // Failed is the phase that the job is restarted failed reached the maximum number of retries. - status.State.Phase = vcbatch.Failed - UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) - return true - } - total := int32(0) - for _, task := range ps.job.Job.Spec.Tasks { - total += task.Replicas - } - - if total-status.Terminating >= status.MinAvailable { - status.State.Phase = vcbatch.Pending - return true - } - - return false - }) +func (ps *restartingState) restartingUpdateStatus(status *vcbatch.JobStatus) bool { + // Get the maximum number of retries. + maxRetry := ps.job.Job.Spec.MaxRetry + + if status.RetryCount >= maxRetry { + // Failed is the phase that the job is restarted failed reached the maximum number of retries. + status.State.Phase = vcbatch.Failed + UpdateJobFailed(fmt.Sprintf("%s/%s", ps.job.Job.Namespace, ps.job.Job.Name), ps.job.Job.Spec.Queue) + return true + } + total := int32(0) + for _, task := range ps.job.Job.Spec.Tasks { + total += task.Replicas + } + + if total-status.Terminating >= status.MinAvailable { + status.State.Phase = vcbatch.Pending + return true + } + + return false +} + +func (ps *restartingState) Execute(action Action) error { + switch action.Action { + case v1alpha1.SyncJobAction: + return SyncJob(ps.job, ps.restartingUpdateStatus) + case v1alpha1.RestartTaskAction, v1alpha1.RestartPodAction: + return KillTarget(ps.job, action.Target, ps.restartingUpdateStatus) + default: + return KillJob(ps.job, PodRetainPhaseNone, ps.restartingUpdateStatus) + } } diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index ca0c688a2d..d667d9d3dc 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -30,14 +30,20 @@ type runningState struct { job *apis.JobInfo } -func (ps *runningState) Execute(action v1alpha1.Action) error { - switch action { +func (ps *runningState) Execute(action Action) error { + switch action.Action { case v1alpha1.RestartJobAction: return KillJob(ps.job, PodRetainPhaseNone, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Restarting status.RetryCount++ return true }) + case v1alpha1.RestartTaskAction, v1alpha1.RestartPodAction: + return KillTarget(ps.job, action.Target, func(status *vcbatch.JobStatus) bool { + status.State.Phase = vcbatch.Restarting + status.RetryCount++ + return true + }) case v1alpha1.AbortJobAction: return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { status.State.Phase = vcbatch.Aborting diff --git a/pkg/controllers/job/state/terminating.go b/pkg/controllers/job/state/terminating.go index 81c17f1a93..a8e2161ef3 100644 --- a/pkg/controllers/job/state/terminating.go +++ b/pkg/controllers/job/state/terminating.go @@ -18,7 +18,6 @@ package state import ( vcbatch "volcano.sh/apis/pkg/apis/batch/v1alpha1" - "volcano.sh/apis/pkg/apis/bus/v1alpha1" "volcano.sh/volcano/pkg/controllers/apis" ) @@ -26,7 +25,7 @@ type terminatingState struct { job *apis.JobInfo } -func (ps *terminatingState) Execute(action v1alpha1.Action) error { +func (ps *terminatingState) Execute(action Action) error { return KillJob(ps.job, PodRetainPhaseSoft, func(status *vcbatch.JobStatus) bool { // If any "alive" pods, still in Terminating phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { diff --git a/pkg/webhooks/admission/jobs/validate/util.go b/pkg/webhooks/admission/jobs/validate/util.go index c1923c40cc..8ac74fbf27 100644 --- a/pkg/webhooks/admission/jobs/validate/util.go +++ b/pkg/webhooks/admission/jobs/validate/util.go @@ -32,6 +32,8 @@ var policyEventMap = map[busv1alpha1.Event]bool{ busv1alpha1.AnyEvent: true, busv1alpha1.PodFailedEvent: true, busv1alpha1.PodEvictedEvent: true, + busv1alpha1.PodPendingEvent: true, + busv1alpha1.PodRunningEvent: true, busv1alpha1.JobUnknownEvent: true, busv1alpha1.TaskCompletedEvent: true, busv1alpha1.TaskFailedEvent: true,