From 49e8af2c11fef1017ba5b0e270858e9d2644d607 Mon Sep 17 00:00:00 2001 From: "Da K. Ma" Date: Sun, 8 Jul 2018 17:48:06 +0800 Subject: [PATCH] Enhanced job deletion. Signed-off-by: Da K. Ma --- pkg/scheduler/api/helpers.go | 5 +++++ pkg/scheduler/cache/cache.go | 32 +++++++++++++-------------- pkg/scheduler/cache/event_handlers.go | 11 ++++++++- 3 files changed, 30 insertions(+), 18 deletions(-) diff --git a/pkg/scheduler/api/helpers.go b/pkg/scheduler/api/helpers.go index 45f08ebcaf..8fa8279ac6 100644 --- a/pkg/scheduler/api/helpers.go +++ b/pkg/scheduler/api/helpers.go @@ -95,3 +95,8 @@ func MergeErrors(errs ...error) error { return nil } + +// JobTerminated checkes whether job was terminated. +func JobTerminated(job *JobInfo) bool { + return job.SchedSpec == nil && len(job.Tasks) == 0 +} diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index 2a17ad89e6..676ca4bdb3 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -319,23 +319,13 @@ func (sc *SchedulerCache) Bind(taskInfo *arbapi.TaskInfo, hostname string) error } func (sc *SchedulerCache) deleteJob(job *arbapi.JobInfo) { + glog.V(3).Infof("Try to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name) + time.AfterFunc(5*time.Second, func() { sc.deletedJobs.AddIfNotPresent(job) }) } -func (sc *SchedulerCache) cleanupJob(job *arbapi.JobInfo) error { - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - if job.SchedSpec == nil && len(job.Tasks) == 0 { - delete(sc.Jobs, job.UID) - return nil - } - - return fmt.Errorf("Job <%v/%v> is not ready to clean up", job.Namespace, job.Name) -} - func (sc *SchedulerCache) processCleanupJob() error { _, err := sc.deletedJobs.Pop(func(obj interface{}) error { job, ok := obj.(*arbapi.JobInfo) @@ -343,11 +333,19 @@ func (sc *SchedulerCache) processCleanupJob() error { return fmt.Errorf("failed to convert %v to *v1.Pod", obj) } - if err := sc.cleanupJob(job); err != nil { - // Requeue Job to wait for all tasks deleted. - sc.deleteJob(job) - return err - } + func() { + sc.Mutex.Lock() + defer sc.Mutex.Unlock() + + if arbapi.JobTerminated(job) { + delete(sc.Jobs, job.UID) + glog.V(3).Infof("Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) + } else { + // Retry + sc.deleteJob(job) + } + }() + return nil }) diff --git a/pkg/scheduler/cache/event_handlers.go b/pkg/scheduler/cache/event_handlers.go index ead6c25476..7837988b3f 100644 --- a/pkg/scheduler/cache/event_handlers.go +++ b/pkg/scheduler/cache/event_handlers.go @@ -131,7 +131,16 @@ func (sc *SchedulerCache) deleteTask(pi *arbapi.TaskInfo) error { // Assumes that lock is already acquired. func (sc *SchedulerCache) deletePod(pod *v1.Pod) error { pi := arbapi.NewTaskInfo(pod) - return sc.deleteTask(pi) + if err := sc.deleteTask(pi); err != nil { + return err + } + + // If job was terminated, delete it. + if job, found := sc.Jobs[pi.Job]; found && arbapi.JobTerminated(job) { + sc.deleteJob(job) + } + + return nil } func (sc *SchedulerCache) AddPod(obj interface{}) {