diff --git a/hack/lib/init.sh b/hack/lib/init.sh index f22a4aeef0..5a98ea2317 100755 --- a/hack/lib/init.sh +++ b/hack/lib/init.sh @@ -43,7 +43,6 @@ kube::log::install_errexit source "${KUBE_ROOT}/hack/lib/version.sh" source "${KUBE_ROOT}/hack/lib/golang.sh" -source "${KUBE_ROOT}/hack/lib/etcd.sh" KUBE_OUTPUT_HOSTBIN="${KUBE_OUTPUT_BINPATH}/$(kube::util::host_platform)" diff --git a/pkg/controllers/job/apis/types.go b/pkg/controllers/job/apis/types.go new file mode 100644 index 0000000000..c1c1583f53 --- /dev/null +++ b/pkg/controllers/job/apis/types.go @@ -0,0 +1,37 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apis + +import ( + "k8s.io/api/core/v1" + + vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" +) + +type JobInfo struct { + Job *vkbatchv1.Job + Pods map[string]map[string]*v1.Pod +} + +type Request struct { + Namespace string + JobName string + TaskName string + + Event vkbatchv1.Event + Action vkbatchv1.Action +} diff --git a/pkg/controllers/job/cache/cache.go b/pkg/controllers/job/cache/cache.go new file mode 100644 index 0000000000..85fca6d76e --- /dev/null +++ b/pkg/controllers/job/cache/cache.go @@ -0,0 +1,252 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "fmt" + "sync" + + "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" + + "k8s.io/api/core/v1" +) + +type Cache interface { + Get(key string) (*apis.JobInfo, error) + GetStatus(key string) (*v1alpha1.JobStatus, error) + Add(obj *v1alpha1.Job) error + Update(obj *v1alpha1.Job) error + Delete(obj *v1alpha1.Job) error + + AddPod(pod *v1.Pod) error + UpdatePod(pod *v1.Pod) error + DeletePod(pod *v1.Pod) error +} + +type jobCache struct { + sync.Mutex + + jobs map[string]*apis.JobInfo +} + +func keyFn(ns, name string) string { + return fmt.Sprintf("%s/%s", ns, name) +} + +func JobKeyByReq(req *apis.Request) string { + return keyFn(req.Namespace, req.JobName) +} + +func JobKey(req *v1alpha1.Job) string { + return keyFn(req.Namespace, req.Name) +} + +func jobKeyOfPod(pod *v1.Pod) (string, error) { + jobName, found := pod.Annotations[v1alpha1.JobNameKey] + if !found { + return "", fmt.Errorf("failed to find job name of pod <%s/%s>", + pod.Namespace, pod.Name) + } + + return keyFn(pod.Namespace, jobName), nil +} + +func New() Cache { + return &jobCache{ + jobs: map[string]*apis.JobInfo{}, + } +} + +func (jc *jobCache) Get(key string) (*apis.JobInfo, error) { + jc.Lock() + defer jc.Unlock() + + job, found := jc.jobs[key] + if !found { + return nil, fmt.Errorf("failed to find job <%s>", key) + } + + jobInfo := &apis.JobInfo{ + Job: job.Job, + Pods: make(map[string]map[string]*v1.Pod), + } + + // Copy Pods. + for key, pods := range job.Pods { + jobInfo.Pods[key] = make(map[string]*v1.Pod) + for pn, pod := range pods { + jobInfo.Pods[key][pn] = pod + } + } + + return jobInfo, nil +} + +func (jc *jobCache) GetStatus(key string) (*v1alpha1.JobStatus, error) { + jc.Lock() + defer jc.Unlock() + + job, found := jc.jobs[key] + if !found { + return nil, fmt.Errorf("failed to find job <%s>", key) + } + + status := job.Job.Status + + return &status, nil +} + +func (jc *jobCache) Add(obj *v1alpha1.Job) error { + jc.Lock() + defer jc.Unlock() + + key := JobKey(obj) + if _, found := jc.jobs[key]; found { + return fmt.Errorf("duplicated job <%v>", key) + } + + jc.jobs[key] = &apis.JobInfo{ + Job: obj, + Pods: make(map[string]map[string]*v1.Pod), + } + + return nil +} + +func (jc *jobCache) Update(obj *v1alpha1.Job) error { + jc.Lock() + defer jc.Unlock() + + key := JobKey(obj) + if job, found := jc.jobs[key]; !found { + return fmt.Errorf("failed to find job <%v>", key) + } else { + job.Job = obj + } + + return nil +} + +func (jc *jobCache) Delete(obj *v1alpha1.Job) error { + jc.Lock() + defer jc.Unlock() + + key := JobKey(obj) + if _, found := jc.jobs[key]; !found { + return fmt.Errorf("failed to find job <%v>", key) + } + + delete(jc.jobs, key) + + return nil +} + +func (jc *jobCache) AddPod(pod *v1.Pod) error { + jc.Lock() + defer jc.Unlock() + + key, err := jobKeyOfPod(pod) + if err != nil { + return err + } + + job, found := jc.jobs[key] + if !found { + job = &apis.JobInfo{ + Pods: make(map[string]map[string]*v1.Pod), + } + jc.jobs[key] = job + } + + taskName, found := pod.Annotations[v1alpha1.TaskSpecKey] + if !found { + return fmt.Errorf("failed to taskName of Pod <%s/%s>", + pod.Namespace, pod.Name) + } + + if _, found := job.Pods[taskName]; !found { + job.Pods[taskName] = make(map[string]*v1.Pod) + } + if _, found := job.Pods[taskName][pod.Name]; found { + return fmt.Errorf("duplicated pod") + } + job.Pods[taskName][pod.Name] = pod + + return nil +} + +func (jc *jobCache) UpdatePod(pod *v1.Pod) error { + jc.Lock() + defer jc.Unlock() + + key, err := jobKeyOfPod(pod) + if err != nil { + return err + } + + job, found := jc.jobs[key] + if !found { + job = &apis.JobInfo{ + Pods: make(map[string]map[string]*v1.Pod), + } + jc.jobs[key] = job + } + + taskName, found := pod.Annotations[v1alpha1.TaskSpecKey] + if !found { + return fmt.Errorf("failed to taskName of Pod <%s/%s>", + pod.Namespace, pod.Name) + } + + if _, found := job.Pods[taskName]; !found { + job.Pods[taskName] = make(map[string]*v1.Pod) + } + job.Pods[taskName][pod.Name] = pod + + return nil +} + +func (jc *jobCache) DeletePod(pod *v1.Pod) error { + jc.Lock() + defer jc.Unlock() + + key, err := jobKeyOfPod(pod) + if err != nil { + return err + } + + job, found := jc.jobs[key] + if !found { + job = &apis.JobInfo{ + Pods: make(map[string]map[string]*v1.Pod), + } + jc.jobs[key] = job + } + + taskName, found := pod.Annotations[v1alpha1.TaskSpecKey] + if !found { + return fmt.Errorf("failed to taskName of Pod <%s/%s>", + pod.Namespace, pod.Name) + } + + if pods, found := job.Pods[taskName]; found { + delete(pods, pod.Name) + } + + return nil +} diff --git a/pkg/controllers/job/job_controller.go b/pkg/controllers/job/job_controller.go index 992c73d1a5..3bec24d63d 100644 --- a/pkg/controllers/job/job_controller.go +++ b/pkg/controllers/job/job_controller.go @@ -18,6 +18,7 @@ package job import ( "fmt" + "github.com/golang/glog" "k8s.io/api/core/v1" @@ -29,13 +30,13 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" kbver "github.com/kubernetes-sigs/kube-batch/pkg/client/clientset/versioned" kbinfoext "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions" kbinfo "github.com/kubernetes-sigs/kube-batch/pkg/client/informers/externalversions/scheduling/v1alpha1" kblister "github.com/kubernetes-sigs/kube-batch/pkg/client/listers/scheduling/v1alpha1" - vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" v1corev1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1" "hpw.cloud/volcano/pkg/apis/helpers" vkver "hpw.cloud/volcano/pkg/client/clientset/versioned" @@ -44,6 +45,9 @@ import ( vkcoreinfo "hpw.cloud/volcano/pkg/client/informers/externalversions/bus/v1alpha1" vkbatchlister "hpw.cloud/volcano/pkg/client/listers/batch/v1alpha1" vkcorelister "hpw.cloud/volcano/pkg/client/listers/bus/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" + jobcache "hpw.cloud/volcano/pkg/controllers/job/cache" + "hpw.cloud/volcano/pkg/controllers/job/queue" "hpw.cloud/volcano/pkg/controllers/job/state" ) @@ -83,8 +87,9 @@ type Controller struct { cmdLister vkcorelister.CommandLister cmdSynced func() bool - // eventQueue that need to sync up - eventQueue *cache.FIFO + // queue that need to sync up + queue workqueue.RateLimitingInterface + cache jobcache.Cache } // NewJobController create new Job Controller @@ -94,7 +99,8 @@ func NewJobController(config *rest.Config) *Controller { kubeClients: kubernetes.NewForConfigOrDie(config), vkClients: vkver.NewForConfigOrDie(config), kbClients: kbver.NewForConfigOrDie(config), - eventQueue: cache.NewFIFO(eventKey), + queue: queue.New(eventKey), + cache: jobcache.New(), } cc.jobInformer = vkinfoext.NewSharedInformerFactory(cc.vkClients, 0).Batch().V1alpha1().Jobs() @@ -194,57 +200,44 @@ func (cc *Controller) Run(stopCh <-chan struct{}) { glog.Infof("JobController is running ...... ") } -type Request struct { - Namespace string - JobName string - PodName string - - Event vkbatchv1.Event - Action vkbatchv1.Action - - Reason string - Message string -} - func (cc *Controller) worker() { - obj := cache.Pop(cc.eventQueue) - if obj == nil { - glog.Errorf("Fail to pop item from eventQueue") + obj, shutdown := cc.queue.Get() + if shutdown { + glog.Errorf("Fail to pop item from queue") return } - req := obj.(*Request) + req := obj.(apis.Request) + defer cc.queue.Done(req) + + glog.V(3).Infof("Try to handle request <%v>", req) - job, err := cc.jobLister.Jobs(req.Namespace).Get(req.JobName) + jobInfo, err := cc.cache.Get(jobcache.JobKeyByReq(&req)) if err != nil { return } - st := state.NewState(job) - if st == nil { - glog.Errorf("Invalid state <%s> of Job <%v/%v>", - job.Status.State, job.Namespace, job.Name) + if jobInfo.Job == nil { + glog.V(3).Infof("Cache is out of sync for <%v>, retry it.", req) + cc.queue.AddRateLimited(req) return } - action := req.Action - if len(action) == 0 { - pod, err := cc.podLister.Pods(req.Namespace).Get(req.PodName) - if err != nil { - pod = nil - } - action = applyPolicies(req.Event, job, pod) + st := state.NewState(jobInfo) + if st == nil { + glog.Errorf("Invalid state <%s> of Job <%v/%v>", + jobInfo.Job.Status.State, jobInfo.Job.Namespace, jobInfo.Job.Name) + return } + action := applyPolicies(jobInfo.Job, &req) glog.V(3).Infof("Execute <%v> on Job <%s/%s> in <%s> by <%T>.", - action, req.Namespace, req.JobName, job.Status.State.Phase, st) + action, req.Namespace, req.JobName, jobInfo.Job.Status.State.Phase, st) - if err := st.Execute(action, req.Reason, req.Message); err != nil { + if err := st.Execute(action); err != nil { glog.Errorf("Failed to handle Job <%s/%s>: %v", - job.Namespace, job.Name, err) + jobInfo.Job.Namespace, jobInfo.Job.Name, err) // If any error, requeue it. - if e := cc.eventQueue.Add(req); e != nil { - glog.Errorf("Failed to reqeueue request <%v>", req) - } + cc.queue.AddRateLimited(req) } } diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 3901bbc96e..ce4a954026 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -19,52 +19,37 @@ package job import ( "fmt" "sync" - "time" "github.com/golang/glog" + kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - - kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" "hpw.cloud/volcano/pkg/apis/helpers" + "hpw.cloud/volcano/pkg/controllers/job/apis" "hpw.cloud/volcano/pkg/controllers/job/state" ) -func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error { - glog.V(3).Infof("Killing Job <%s/%s>", job.Namespace, job.Name) - defer glog.V(3).Infof("Finished Job <%s/%s> killing", job.Namespace, job.Name) - - job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) - if err != nil { - if apierrors.IsNotFound(err) { - glog.V(3).Infof("Job has been deleted: %v", job.Name) - return nil - } - return err - } +func (cc *Controller) killJob(jobInfo *apis.JobInfo, nextState state.NextStateFn) error { + glog.V(3).Infof("Killing Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) + defer glog.V(3).Infof("Finished Job <%s/%s> killing", jobInfo.Job.Namespace, jobInfo.Job.Name) + job := jobInfo.Job if job.DeletionTimestamp != nil { glog.Infof("Job <%s/%s> is terminating, skip management process.", job.Namespace, job.Name) return nil } - podsMap, err := getPodsForJob(cc.podLister, job) - if err != nil { - return err - } - var pending, running, terminating, succeeded, failed int32 var errs []error var total int - for _, pods := range podsMap { + for _, pods := range jobInfo.Pods { for _, pod := range pods { total++ @@ -111,18 +96,16 @@ func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error if nextState != nil { job.Status.State = nextState(job.Status) } - newState := job.Status.State // Update Job status - if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { + if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { glog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) return err - } - - if err := cc.waitForJobState(job, newState); err != nil { - glog.Errorf("Failed to sync Job's status.") - return err + } else { + if e := cc.cache.Update(job); e != nil { + return e + } } // Delete PodGroup @@ -148,17 +131,11 @@ func (cc *Controller) killJob(job *vkv1.Job, nextState state.NextStateFn) error return nil } -func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error { - glog.V(3).Infof("Starting to sync up Job <%s/%s>", job.Namespace, job.Name) - defer glog.V(3).Infof("Finished Job <%s/%s> sync up", job.Namespace, job.Name) +func (cc *Controller) syncJob(jobInfo *apis.JobInfo, nextState state.NextStateFn) error { + glog.V(3).Infof("Starting to sync up Job <%s/%s>", jobInfo.Job.Namespace, jobInfo.Job.Name) + defer glog.V(3).Infof("Finished Job <%s/%s> sync up", jobInfo.Job.Namespace, jobInfo.Job.Name) - job, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) - if err != nil { - if apierrors.IsNotFound(err) { - return nil - } - return err - } + job := jobInfo.Job if job.DeletionTimestamp != nil { glog.Infof("Job <%s/%s> is terminating, skip management process.", @@ -166,13 +143,6 @@ func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error return nil } - podsMap, err := getPodsForJob(cc.podLister, job) - if err != nil { - return err - } - - glog.V(3).Infof("Start to manage job <%s/%s>", job.Namespace, job.Name) - // TODO(k82cn): add WebHook to validate job. if err := validate(job); err != nil { glog.Errorf("Failed to validate Job <%s/%s>: %v", job.Namespace, job.Name, err) @@ -202,7 +172,7 @@ func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error name = vkv1.DefaultTaskSpec } - pods, found := podsMap[name] + pods, found := jobInfo.Pods[name] if !found { pods = map[string]*v1.Pod{} } @@ -309,19 +279,18 @@ func (cc *Controller) syncJob(job *vkv1.Job, nextState state.NextStateFn) error if nextState != nil { job.Status.State = nextState(job.Status) } - newState := job.Status.State - if _, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { + if job, err := cc.vkClients.BatchV1alpha1().Jobs(job.Namespace).Update(job); err != nil { glog.Errorf("Failed to update status of Job %v/%v: %v", job.Namespace, job.Name, err) return err + } else { + if e := cc.cache.Update(job); e != nil { + return e + } } - if err := cc.waitForJobState(job, newState); err != nil { - return err - } - - return err + return nil } func (cc *Controller) createServiceIfNotExist(job *vkv1.Job) error { @@ -459,24 +428,3 @@ func (cc *Controller) createPodGroupIfNotExist(job *vkv1.Job) error { return nil } - -// waitForJobState will wait for job's state changed to the target status. -// TODO(k82cn): enhance by cache/assume -func (cc *Controller) waitForJobState(job *vkv1.Job, newState vkv1.JobState) error { - if err := wait.Poll(100*time.Microsecond, 10*time.Second, func() (bool, error) { - newJob, err := cc.jobLister.Jobs(job.Namespace).Get(job.Name) - if err != nil { - return false, err - } - - if newJob.Status.State == newState { - return true, nil - } - - return false, nil - }); err != nil { - return err - } - - return nil -} diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 7e7189baa2..16d1f5d6a5 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -24,6 +24,7 @@ import ( vkbatchv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" vkbusv1 "hpw.cloud/volcano/pkg/apis/bus/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) func (cc *Controller) addCommand(obj interface{}) { @@ -33,7 +34,7 @@ func (cc *Controller) addCommand(obj interface{}) { return } - req := &Request{ + req := apis.Request{ Namespace: cmd.Namespace, JobName: cmd.TargetObject.Name, @@ -44,18 +45,15 @@ func (cc *Controller) addCommand(obj interface{}) { glog.V(3).Infof("Try to execute command <%v> on Job <%s/%s>", cmd.Action, req.Namespace, req.JobName) - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) - } + cc.queue.Add(req) go func() { + // TODO(k82cn): record event for this Command if err := cc.vkClients.BusV1alpha1().Commands(cmd.Namespace).Delete(cmd.Name, nil); err != nil { glog.Errorf("Failed to delete Command <%s/%s> which maybe executed again.", cmd.Namespace, cmd.Name) } }() - } func (cc *Controller) addJob(obj interface{}) { @@ -65,17 +63,19 @@ func (cc *Controller) addJob(obj interface{}) { return } - req := &Request{ + req := apis.Request{ Namespace: job.Namespace, JobName: job.Name, Event: vkbatchv1.OutOfSyncEvent, } - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) + // TODO(k82cn): if failed to add job, the cache should be refresh + if err := cc.cache.Add(job); err != nil { + glog.Errorf("Failed to add job <%s/%s>: %v", + job.Namespace, job.Name, err) } + cc.queue.Add(req) } func (cc *Controller) updateJob(oldObj, newObj interface{}) { @@ -85,18 +85,19 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { return } - req := &Request{ + req := apis.Request{ Namespace: newJob.Namespace, JobName: newJob.Name, Event: vkbatchv1.OutOfSyncEvent, } - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) + if err := cc.cache.Update(newJob); err != nil { + glog.Errorf("Failed to update job <%s/%s>: %v", + newJob.Namespace, newJob.Name, err) } + cc.queue.Add(req) } func (cc *Controller) deleteJob(obj interface{}) { @@ -106,16 +107,9 @@ func (cc *Controller) deleteJob(obj interface{}) { return } - req := &Request{ - Namespace: job.Namespace, - JobName: job.Name, - - Event: vkbatchv1.OutOfSyncEvent, - } - - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) + if err := cc.cache.Delete(job); err != nil { + glog.Errorf("Failed to delete job <%s/%s>: %v", + job.Namespace, job.Name, err) } } @@ -128,21 +122,23 @@ func (cc *Controller) addPod(obj interface{}) { jobName, found := pod.Annotations[vkbatchv1.JobNameKey] if !found { + glog.Errorf("Failed to find jobName of Pod <%s/%s>", + pod.Namespace, pod.Name) return } - req := &Request{ + req := apis.Request{ Namespace: pod.Namespace, JobName: jobName, - PodName: pod.Name, Event: vkbatchv1.OutOfSyncEvent, } - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) + if err := cc.cache.AddPod(pod); err != nil { + glog.Errorf("Failed to add Pod <%s/%s>: %v", + pod.Namespace, pod.Name, err) } + cc.queue.Add(req) } func (cc *Controller) updatePod(oldObj, newObj interface{}) { @@ -158,8 +154,17 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { return } + taskName, found := newPod.Annotations[vkbatchv1.TaskSpecKey] + if !found { + glog.Errorf("Failed to find taskName of Pod <%s/%s>", + newPod.Namespace, newPod.Name) + return + } + jobName, found := newPod.Annotations[vkbatchv1.JobNameKey] if !found { + glog.Errorf("Failed to find jobName of Pod <%s/%s>", + newPod.Namespace, newPod.Name) return } @@ -169,17 +174,20 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { event = vkbatchv1.PodFailedEvent } - req := &Request{ + req := apis.Request{ Namespace: newPod.Namespace, JobName: jobName, - PodName: newPod.Name, + TaskName: taskName, Event: event, } - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) + + if err := cc.cache.UpdatePod(newPod); err != nil { + glog.Errorf("Failed to update Pod <%s/%s>: %v", + newPod.Namespace, newPod.Name, err) } + + cc.queue.Add(req) } func (cc *Controller) deletePod(obj interface{}) { @@ -199,23 +207,34 @@ func (cc *Controller) deletePod(obj interface{}) { return } + taskName, found := pod.Annotations[vkbatchv1.TaskSpecKey] + if !found { + glog.Errorf("Failed to find taskName of Pod <%s/%s>", + pod.Namespace, pod.Name) + return + } + jobName, found := pod.Annotations[vkbatchv1.JobNameKey] if !found { + glog.Errorf("Failed to find jobName of Pod <%s/%s>", + pod.Namespace, pod.Name) return } - req := &Request{ + req := apis.Request{ Namespace: pod.Namespace, JobName: jobName, - PodName: pod.Name, + TaskName: taskName, Event: vkbatchv1.PodEvictedEvent, } - if err := cc.eventQueue.Add(req); err != nil { - glog.Errorf("Failed to add request <%v> into queue: %v", - req, err) + if err := cc.cache.DeletePod(pod); err != nil { + glog.Errorf("Failed to update Pod <%s/%s>: %v", + pod.Namespace, pod.Name, err) } + + cc.queue.Add(req) } // TODO(k82cn): add handler for PodGroup unschedulable event. diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 0e9598659d..9c8efe4012 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -19,17 +19,14 @@ package job import ( "fmt" - "github.com/golang/glog" - "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - corelisters "k8s.io/client-go/listers/core/v1" kbapi "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" "hpw.cloud/volcano/pkg/apis/helpers" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) func validate(job *vkv1.Job) error { @@ -46,13 +43,16 @@ func validate(job *vkv1.Job) error { return nil } -func eventKey(obj interface{}) (string, error) { - req, ok := obj.(*Request) +func eventKey(obj interface{}) interface{} { + req, ok := obj.(apis.Request) if !ok { - return "", fmt.Errorf("failed to convert %v to *Request", obj) + return obj } - return fmt.Sprintf("%s/%s", req.Namespace, req.JobName), nil + return apis.Request{ + Namespace: req.Namespace, + JobName: req.JobName, + } } func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { @@ -160,59 +160,28 @@ func createJobPod(job *vkv1.Job, template *v1.PodTemplateSpec, ix int) *v1.Pod { return pod } -func getPodsForJob(podLister corelisters.PodLister, job *vkv1.Job) (map[string]map[string]*v1.Pod, error) { - pods := map[string]map[string]*v1.Pod{} - - // TODO (k82cn): optimist by cache and index of owner; add 'ControlledBy' extended interface. - ps, err := podLister.Pods(job.Namespace).List(labels.Everything()) - if err != nil { - return nil, err - } - - for _, pod := range ps { - if !metav1.IsControlledBy(pod, job) { - continue - } - if len(pod.Annotations) == 0 { - glog.Errorf("The annotations of pod <%s/%s> is empty", pod.Namespace, pod.Name) - continue - } - tsName, found := pod.Annotations[vkv1.TaskSpecKey] - if found { - // Hash by TaskSpec.Template.Name - if _, exist := pods[tsName]; !exist { - pods[tsName] = make(map[string]*v1.Pod) - } - pods[tsName][pod.Name] = pod - } +func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { + if len(req.Action) != 0 { + return req.Action } - return pods, nil -} - -func applyPolicies(event vkv1.Event, job *vkv1.Job, pod *v1.Pod) vkv1.Action { // Overwrite Job level policies - if pod != nil { + if len(req.TaskName) != 0 { // Parse task level policies - if taskName, found := pod.Annotations[vkv1.TaskSpecKey]; found { - for _, task := range job.Spec.Tasks { - if task.Name == taskName { - for _, policy := range task.Policies { - if policy.Event == event || policy.Event == vkv1.AnyEvent { - return policy.Action - } + for _, task := range job.Spec.Tasks { + if task.Name == req.TaskName { + for _, policy := range task.Policies { + if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { + return policy.Action } } } - } else { - glog.Errorf("Failed to find taskSpecKey in Pod <%s/%s>", - pod.Namespace, pod.Name) } } // Parse Job level policies for _, policy := range job.Spec.Policies { - if policy.Event == event || policy.Event == vkv1.AnyEvent { + if policy.Event == req.Event || policy.Event == vkv1.AnyEvent { return policy.Action } } diff --git a/pkg/controllers/job/queue/queue.go b/pkg/controllers/job/queue/queue.go new file mode 100644 index 0000000000..b477543063 --- /dev/null +++ b/pkg/controllers/job/queue/queue.go @@ -0,0 +1,160 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queue + +import ( + "sync" + "time" + + "github.com/golang/glog" + + "k8s.io/client-go/util/workqueue" +) + +type queue struct { + sync.Mutex + + index workqueue.RateLimitingInterface + router map[interface{}]workqueue.Interface + indexFn func(interface{}) interface{} +} + +func New(indexFn func(interface{}) interface{}) workqueue.RateLimitingInterface { + return &queue{ + index: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + router: make(map[interface{}]workqueue.Interface), + indexFn: indexFn, + } +} + +func (q *queue) getQueue(key interface{}) workqueue.Interface { + q.Lock() + defer q.Unlock() + + if _, found := q.router[key]; !found { + q.router[key] = workqueue.New() + } + + return q.router[key] +} + +// AddRateLimited adds an item to the workqueue after the rate limiter says its ok +func (q *queue) AddRateLimited(item interface{}) { + glog.V(3).Infof("queue.AddRateLimited entering ... ") + defer glog.V(3).Infof("queue.AddRateLimited finished") + + key := q.indexFn(item) + + q.index.AddRateLimited(key) + q.getQueue(key).Add(item) +} + +// Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing +// or for success, we'll stop the rate limiter from tracking it. This only clears the `rateLimiter`, you +// still have to call `Done` on the queue. +func (q *queue) Forget(item interface{}) { + glog.V(3).Infof("queue.Forget entering ... ") + defer glog.V(3).Infof("queue.Forget finished") + + key := q.indexFn(item) + q.index.Forget(key) +} + +// NumRequeues returns back how many times the item was requeued +func (q *queue) NumRequeues(item interface{}) int { + glog.V(3).Infof("queue.NumRequeues entering ... ") + defer glog.V(3).Infof("queue.NumRequeues finished") + + return q.index.NumRequeues(q.indexFn(item)) +} + +// AddAfter adds an item to the workqueue after the indicated duration has passed +func (q *queue) AddAfter(item interface{}, duration time.Duration) { + glog.V(3).Infof("queue.AddAfter entering ... ") + defer glog.V(3).Infof("queue.AddAfter finished") + + key := q.indexFn(item) + q.index.AddAfter(key, duration) + q.getQueue(key).Add(item) +} + +func (q *queue) Add(item interface{}) { + glog.V(3).Infof("queue.Add <%v> entering ... ", item) + defer glog.V(3).Infof("queue.Add <%v> finished", item) + + key := q.indexFn(item) + q.index.Add(key) + q.getQueue(key).Add(item) +} + +func (q *queue) Len() int { + glog.V(3).Infof("queue.Len entering ... ") + defer glog.V(3).Infof("queue.Len finished") + + q.Lock() + defer q.Unlock() + + sum := 0 + + for _, d := range q.router { + sum += d.Len() + } + + return sum +} + +func (q *queue) Get() (item interface{}, shutdown bool) { + glog.V(3).Infof("queue.Get entering ... ") + defer glog.V(3).Infof("queue.Get finished") + + key, sd := q.index.Get() + if sd { + return key, sd + } + + return q.getQueue(key).Get() +} + +func (q *queue) Done(item interface{}) { + glog.V(3).Infof("queue.Done entering ... ") + defer glog.V(3).Infof("queue.Done finished") + + key := q.indexFn(item) + q.index.Done(key) + q.getQueue(key).Done(item) + + q.Lock() + defer q.Unlock() + // If still router, add it back. + if q.router[key].Len() != 0 { + q.index.Add(key) + } +} + +func (q *queue) ShutDown() { + glog.V(3).Infof("queue.ShutDown entering ... ") + defer glog.V(3).Infof("queue.ShutDown finished") + + q.index.ShutDown() +} + +func (q *queue) ShuttingDown() bool { + glog.V(3).Infof("queue.ShuttingDown entering ... ") + defer glog.V(3).Infof("queue.ShuttingDown finished") + + return q.index.ShuttingDown() +} diff --git a/pkg/controllers/job/state/aborted.go b/pkg/controllers/job/state/aborted.go index 7a5a5a90a5..a701e3623b 100644 --- a/pkg/controllers/job/state/aborted.go +++ b/pkg/controllers/job/state/aborted.go @@ -18,20 +18,19 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type abortedState struct { - job *vkv1.Job + job *apis.JobInfo } -func (as *abortedState) Execute(action vkv1.Action, reason string, msg string) (error) { +func (as *abortedState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: return SyncJob(as.job, func(status vkv1.JobStatus) vkv1.JobState { return vkv1.JobState{ - Phase: vkv1.Restarting, - Reason: reason, - Message: msg, + Phase: vkv1.Restarting, } }) default: diff --git a/pkg/controllers/job/state/aborting.go b/pkg/controllers/job/state/aborting.go index aa7126fba6..d0d6f2f2e6 100644 --- a/pkg/controllers/job/state/aborting.go +++ b/pkg/controllers/job/state/aborting.go @@ -18,21 +18,20 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type abortingState struct { - job *vkv1.Job + job *apis.JobInfo } -func (ps *abortingState) Execute(action vkv1.Action, reason string, msg string) (error) { +func (ps *abortingState) Execute(action vkv1.Action) error { switch action { case vkv1.ResumeJobAction: // Already in Restarting phase, just sync it return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { return vkv1.JobState{ - Phase: vkv1.Restarting, - Reason: reason, - Message: msg, + Phase: vkv1.Restarting, } }) default: @@ -40,16 +39,12 @@ func (ps *abortingState) Execute(action vkv1.Action, reason string, msg string) // If any "alive" pods, still in Aborting phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { return vkv1.JobState{ - Phase: vkv1.Aborting, - Reason: reason, - Message: msg, + Phase: vkv1.Aborting, } } return vkv1.JobState{ - Phase: vkv1.Aborted, - Reason: reason, - Message: msg, + Phase: vkv1.Aborted, } }) } diff --git a/pkg/controllers/job/state/factory.go b/pkg/controllers/job/state/factory.go index 1d56a2523b..cc8b1c1dbe 100644 --- a/pkg/controllers/job/state/factory.go +++ b/pkg/controllers/job/state/factory.go @@ -18,10 +18,11 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type NextStateFn func(status vkv1.JobStatus) vkv1.JobState -type ActionFn func(job *vkv1.Job, fn NextStateFn) error +type ActionFn func(job *apis.JobInfo, fn NextStateFn) error var ( // SyncJob will create or delete Pods according to Job's spec. @@ -32,27 +33,28 @@ var ( type State interface { // Execute executes the actions based on current state. - Execute(act vkv1.Action, reason string, msg string) error + Execute(act vkv1.Action) error } -func NewState(job *vkv1.Job) State { +func NewState(jobInfo *apis.JobInfo) State { + job := jobInfo.Job switch job.Status.State.Phase { case vkv1.Pending: - return &pendingState{job: job} + return &pendingState{job: jobInfo} case vkv1.Running: - return &runningState{job: job} + return &runningState{job: jobInfo} case vkv1.Restarting: - return &restartingState{job: job} + return &restartingState{job: jobInfo} case vkv1.Terminated, vkv1.Completed: - return &finishedState{job: job} + return &finishedState{job: jobInfo} case vkv1.Terminating: - return &terminatingState{job: job} + return &terminatingState{job: jobInfo} case vkv1.Aborting: - return &abortingState{job: job} + return &abortingState{job: jobInfo} case vkv1.Aborted: - return &abortedState{job: job} + return &abortedState{job: jobInfo} } // It's pending by default. - return &pendingState{job: job} + return &pendingState{job: jobInfo} } diff --git a/pkg/controllers/job/state/finished.go b/pkg/controllers/job/state/finished.go index 01cef3d3d4..1d13c3f359 100644 --- a/pkg/controllers/job/state/finished.go +++ b/pkg/controllers/job/state/finished.go @@ -18,14 +18,14 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type finishedState struct { - job *vkv1.Job + job *apis.JobInfo } -func (ps *finishedState) Execute(action vkv1.Action, reason string, msg string) (error) { +func (ps *finishedState) Execute(action vkv1.Action) error { // In finished state, e.g. Completed, always kill the whole job. return KillJob(ps.job, nil) } - diff --git a/pkg/controllers/job/state/pending.go b/pkg/controllers/job/state/pending.go index ac6eabc86c..b730d8d683 100644 --- a/pkg/controllers/job/state/pending.go +++ b/pkg/controllers/job/state/pending.go @@ -18,13 +18,14 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type pendingState struct { - job *vkv1.Job + job *apis.JobInfo } -func (ps *pendingState) Execute(action vkv1.Action, reason string, msg string) error { +func (ps *pendingState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { @@ -34,9 +35,7 @@ func (ps *pendingState) Execute(action vkv1.Action, reason string, msg string) e } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) @@ -48,23 +47,19 @@ func (ps *pendingState) Execute(action vkv1.Action, reason string, msg string) e } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) default: return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { - total := totalTasks(ps.job) + total := totalTasks(ps.job.Job) phase := vkv1.Pending if total == status.Running { phase = vkv1.Running } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) } diff --git a/pkg/controllers/job/state/restarting.go b/pkg/controllers/job/state/restarting.go index bd12f8d5c6..1e46882454 100644 --- a/pkg/controllers/job/state/restarting.go +++ b/pkg/controllers/job/state/restarting.go @@ -18,13 +18,14 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type restartingState struct { - job *vkv1.Job + job *apis.JobInfo } -func (ps *restartingState) Execute(action vkv1.Action, reason string, msg string) ( error) { +func (ps *restartingState) Execute(action vkv1.Action) error { return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { phase := vkv1.Restarting if status.Terminating == 0 { @@ -36,9 +37,7 @@ func (ps *restartingState) Execute(action vkv1.Action, reason string, msg string } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) diff --git a/pkg/controllers/job/state/running.go b/pkg/controllers/job/state/running.go index 422b43652d..814015dc31 100644 --- a/pkg/controllers/job/state/running.go +++ b/pkg/controllers/job/state/running.go @@ -18,13 +18,14 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type runningState struct { - job *vkv1.Job + job *apis.JobInfo } -func (ps *runningState) Execute(action vkv1.Action, reason string, msg string) (error) { +func (ps *runningState) Execute(action vkv1.Action) error { switch action { case vkv1.RestartJobAction: return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { @@ -34,9 +35,7 @@ func (ps *runningState) Execute(action vkv1.Action, reason string, msg string) ( } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) case vkv1.AbortJobAction: @@ -47,9 +46,7 @@ func (ps *runningState) Execute(action vkv1.Action, reason string, msg string) ( } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) case vkv1.TerminateJobAction: @@ -60,22 +57,18 @@ func (ps *runningState) Execute(action vkv1.Action, reason string, msg string) ( } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) default: return SyncJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { phase := vkv1.Running - if status.Succeeded+status.Failed == totalTasks(ps.job) { + if status.Succeeded+status.Failed == totalTasks(ps.job.Job) { phase = vkv1.Completed } return vkv1.JobState{ - Phase: phase, - Reason: reason, - Message: msg, + Phase: phase, } }) } diff --git a/pkg/controllers/job/state/terminating.go b/pkg/controllers/job/state/terminating.go index 5a7cd9d33a..01ba667217 100644 --- a/pkg/controllers/job/state/terminating.go +++ b/pkg/controllers/job/state/terminating.go @@ -18,27 +18,24 @@ package state import ( vkv1 "hpw.cloud/volcano/pkg/apis/batch/v1alpha1" + "hpw.cloud/volcano/pkg/controllers/job/apis" ) type terminatingState struct { - job *vkv1.Job + job *apis.JobInfo } -func (ps *terminatingState) Execute(action vkv1.Action, reason string, msg string) (error) { +func (ps *terminatingState) Execute(action vkv1.Action) error { return KillJob(ps.job, func(status vkv1.JobStatus) vkv1.JobState { // If any "alive" pods, still in Terminating phase if status.Terminating != 0 || status.Pending != 0 || status.Running != 0 { return vkv1.JobState{ - Phase: vkv1.Terminating, - Reason: reason, - Message: msg, + Phase: vkv1.Terminating, } } return vkv1.JobState{ - Phase: vkv1.Terminated, - Reason: reason, - Message: msg, + Phase: vkv1.Terminated, } }) }