Skip to content

Commit

Permalink
support more actions for volcano job failure scenario
Browse files Browse the repository at this point in the history
Signed-off-by: Box Zhang <wszwbsddbk@gmail.com>
  • Loading branch information
bibibox committed Dec 27, 2024
1 parent 8962bf8 commit 5baff31
Show file tree
Hide file tree
Showing 19 changed files with 663 additions and 126 deletions.
3 changes: 2 additions & 1 deletion pkg/controllers/apis/job_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,12 +298,13 @@ func TestRequest_String(t *testing.T) {
JobName: "testjobname",
QueueName: "testqueuename",
TaskName: "testtaskname",
PodName: "testpodname",
Event: vcbus.AnyEvent,
ExitCode: 0,
Action: vcbus.SyncJobAction,
JobVersion: 0,
},
ExpectedValue: "Queue: testqueuename, Job: testnamespace/testjobname, Task:testtaskname, Event:*, ExitCode:0, Action:SyncJob, JobVersion: 0",
ExpectedValue: "Queue: testqueuename, Job: testnamespace/testjobname, Task:testtaskname, Pod:testpodname, Event:*, ExitCode:0, Action:SyncJob, JobVersion: 0",
},
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controllers/apis/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Request struct {
JobUid types.UID
TaskName string
QueueName string
PodName string

Event v1alpha1.Event
ExitCode int32
Expand All @@ -42,8 +43,8 @@ type Request struct {
// String function returns the request in string format.
func (r Request) String() string {
return fmt.Sprintf(
"Queue: %s, Job: %s/%s, Task:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.QueueName, r.Namespace, r.JobName, r.TaskName, r.Event, r.ExitCode, r.Action, r.JobVersion)
"Queue: %s, Job: %s/%s, Task:%s, Pod:%s, Event:%s, ExitCode:%d, Action:%s, JobVersion: %d",
r.QueueName, r.Namespace, r.JobName, r.TaskName, r.PodName, r.Event, r.ExitCode, r.Action, r.JobVersion)
}

// FlowRequest The object of sync operation, used for JobFlow and JobTemplate
Expand Down
204 changes: 179 additions & 25 deletions pkg/controllers/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ limitations under the License.
package job

import (
"context"
"fmt"
"hash"
"hash/fnv"
"sync"
"time"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -60,6 +62,29 @@ func init() {
framework.RegisterController(&jobcontroller{})
}

type delayAction struct {
// The namespacing name of the job
jobKey string

// The name of the task
taskName string

// The name of the pod
podName string

// The event caused the action
event busv1alpha1.Event

// The action to take.
action busv1alpha1.Action

// The delay before the action is executed
delay time.Duration

// The cancel function of the action
cancel context.CancelFunc
}

// jobcontroller the Job jobcontroller type.
type jobcontroller struct {
kubeClient kubernetes.Interface
Expand Down Expand Up @@ -106,15 +131,20 @@ type jobcontroller struct {
queueSynced func() bool

// queue that need to sync up
queueList []workqueue.RateLimitingInterface
commandQueue workqueue.RateLimitingInterface
queueList []workqueue.TypedRateLimitingInterface[any]
commandQueue workqueue.TypedRateLimitingInterface[any]
cache jobcache.Cache
// Job Event recorder
recorder record.EventRecorder

errTasks workqueue.RateLimitingInterface
errTasks workqueue.TypedRateLimitingInterface[any]
workers uint32
maxRequeueNum int

delayActionMapLock sync.RWMutex
// delayActionMap stores delayed actions for jobs, where outer map key is job key (namespace/name),
// inner map key is pod name, and value is the delayed action to be performed
delayActionMap map[string]map[string]*delayAction
}

func (cc *jobcontroller) Name() string {
Expand All @@ -135,8 +165,8 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
recorder := eventBroadcaster.NewRecorder(vcscheme.Scheme, v1.EventSource{Component: "vc-controller-manager"})

cc.informerFactory = sharedInformers
cc.queueList = make([]workqueue.RateLimitingInterface, workers)
cc.commandQueue = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
cc.queueList = make([]workqueue.TypedRateLimitingInterface[any], workers)
cc.commandQueue = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
cc.cache = jobcache.New()
cc.errTasks = newRateLimitingQueue()
cc.recorder = recorder
Expand All @@ -148,7 +178,7 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {

var i uint32
for i = 0; i < workers; i++ {
cc.queueList[i] = workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
cc.queueList[i] = workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[any]())
}

factory := opt.VCSharedInformerFactory
Expand Down Expand Up @@ -226,10 +256,12 @@ func (cc *jobcontroller) Initialize(opt *framework.ControllerOption) error {
cc.queueLister = cc.queueInformer.Lister()
cc.queueSynced = cc.queueInformer.Informer().HasSynced

cc.delayActionMap = make(map[string]map[string]*delayAction)

// Register actions
state.SyncJob = cc.syncJob
state.KillJob = cc.killJob

state.KillTarget = cc.killTarget
return nil
}

Expand Down Expand Up @@ -327,6 +359,8 @@ func (cc *jobcontroller) processNextReq(count uint32) bool {

klog.V(3).Infof("Try to handle request <%v>", req)

cc.CleanPodDelayActionsIfNeed(req)

jobInfo, err := cc.cache.Get(key)
if err != nil {
// TODO(k82cn): ignore not-ready error.
Expand All @@ -341,34 +375,154 @@ func (cc *jobcontroller) processNextReq(count uint32) bool {
return true
}

action := applyPolicies(jobInfo.Job, &req)
delayAct := applyPolicies(jobInfo.Job, &req)

if delayAct.delay != 0 {
klog.V(3).Infof("Execute <%v> on Job <%s/%s> after %s",
delayAct.action, req.Namespace, req.JobName, delayAct.delay.String())
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Execute action %s after %s", delayAct.action, delayAct.delay.String()))
cc.AddDelayActionForJob(req, delayAct)
return true
}

klog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.",
action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)
delayAct.action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st)

if action != busv1alpha1.SyncJobAction {
if delayAct.action != busv1alpha1.SyncJobAction {
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Start to execute action %s ", action))
"Start to execute action %s ", delayAct.action))
}

action := GetStateAction(delayAct)

if err := st.Execute(action); err != nil {
if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
jobInfo.Job.Namespace, jobInfo.Job.Name, err)
// If any error, requeue it.
queue.AddRateLimited(req)
return true
}
cc.recordJobEvent(jobInfo.Job.Namespace, jobInfo.Job.Name, batchv1alpha1.ExecuteAction, fmt.Sprintf(
"Job failed on action %s for retry limit reached", action))
klog.Warningf("Terminating Job <%s/%s> and releasing resources", jobInfo.Job.Namespace, jobInfo.Job.Name)
if err = st.Execute(busv1alpha1.TerminateJobAction); err != nil {
klog.Errorf("Failed to terminate Job<%s/%s>: %v", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
}
klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached", jobInfo.Job.Namespace, jobInfo.Job.Name, err)
cc.handleJobError(queue, req, st, err, delayAct.action)
}

// If no error, forget it.
queue.Forget(req)

// If the action is not an internal action, cancel all delayed actions
if !isInternalAction(delayAct.action) {
cc.cleanupDelayActions(delayAct.jobKey)
}

return true
}

// CleanPodDelayActionsIfNeed is used to clean delayed actions for Pod events when the pod phase changed:
// if the event is not PodPending event:
// - cancel corresponding Pod Pending delayed action
// - if the event is PodRunning state, cancel corresponding Pod Failed and Pod Evicted delayed actions
func (cc *jobcontroller) CleanPodDelayActionsIfNeed(req apis.Request) {
if req.Event != busv1alpha1.PodPendingEvent {
key := jobcache.JobKeyByReq(&req)
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()

if taskMap, exists := cc.delayActionMap[key]; exists {
if delayAct, exists := taskMap[req.PodName]; exists {
shouldCancel := false

if delayAct.event == busv1alpha1.PodPendingEvent {
shouldCancel = true
}

if (delayAct.event == busv1alpha1.PodFailedEvent || delayAct.event == busv1alpha1.PodEvictedEvent) &&
req.Event == busv1alpha1.PodRunningEvent {
shouldCancel = true
}

if shouldCancel {
klog.V(3).Infof("Cancel delayed action <%v> for pod <%s> of Job <%s>", delayAct.action, req.PodName, delayAct.jobKey)
delayAct.cancel()
delete(taskMap, req.PodName)
}
}
}
}
}

func (cc *jobcontroller) AddDelayActionForJob(req apis.Request, delayAct *delayAction) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()

m, ok := cc.delayActionMap[delayAct.jobKey]
if !ok {
m = make(map[string]*delayAction)
cc.delayActionMap[delayAct.jobKey] = m
}
if oldDelayAct, exists := m[req.PodName]; exists && oldDelayAct.action == delayAct.action {
return
}
m[req.PodName] = delayAct

ctx, cancel := context.WithTimeout(context.Background(), delayAct.delay)
delayAct.cancel = cancel

go func() {
<-ctx.Done()
if ctx.Err() == context.Canceled {
klog.V(4).Infof("Job<%s/%s>'s delayed action %s is canceled", req.Namespace, req.JobName, delayAct.action)
return
}

klog.V(4).Infof("Job<%s/%s>'s delayed action %s is expired, execute it", req.Namespace, req.JobName, delayAct.action)

jobInfo, err := cc.cache.Get(delayAct.jobKey)
if err != nil {
klog.Errorf("Failed to get job by <%v> from cache: %v", req, err)
return
}

st := state.NewState(jobInfo)
if st == nil {
klog.Errorf("Invalid state <%s> of Job <%v/%v>",
jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name)
return
}
queue := cc.getWorkerQueue(delayAct.jobKey)

if err := st.Execute(GetStateAction(delayAct)); err != nil {
cc.handleJobError(queue, req, st, err, delayAct.action)
}

queue.Forget(req)

cc.cleanupDelayActions(delayAct.jobKey)
}()
}

func (cc *jobcontroller) handleJobError(queue workqueue.TypedRateLimitingInterface[any], req apis.Request, st state.State, err error, action busv1alpha1.Action) {
if cc.maxRequeueNum == -1 || queue.NumRequeues(req) < cc.maxRequeueNum {
klog.V(2).Infof("Failed to handle Job <%s/%s>: %v",
req.Namespace, req.JobName, err)
queue.AddRateLimited(req)
return
}

cc.recordJobEvent(req.Namespace, req.JobName, batchv1alpha1.ExecuteAction,
fmt.Sprintf("Job failed on action %s for retry limit reached", action))
klog.Warningf("Terminating Job <%s/%s> and releasing resources", req.Namespace, req.JobName)

if err = st.Execute(state.Action{Action: busv1alpha1.TerminateJobAction}); err != nil {
klog.Errorf("Failed to terminate Job<%s/%s>: %v", req.Namespace, req.JobName, err)
}
klog.Warningf("Dropping job<%s/%s> out of the queue: %v because max retries has reached",
req.Namespace, req.JobName, err)
}

func (cc *jobcontroller) cleanupDelayActions(jobKey string) {
cc.delayActionMapLock.Lock()
defer cc.delayActionMapLock.Unlock()

if m, exists := cc.delayActionMap[jobKey]; exists {
for _, delayAct := range m {
if delayAct.cancel != nil {
delayAct.cancel()
}
}
cc.delayActionMap[jobKey] = make(map[string]*delayAction)
}
}
Loading

0 comments on commit 5baff31

Please sign in to comment.