diff --git a/Makefile b/Makefile index 956318ed0e..64e02fa93d 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,8 @@ BIN_DIR=_output/bin -export IMAGE=volcano -export TAG = 1.0 +IMAGE=volcano +TAG = 0.1 + +.EXPORT_ALL_VARIABLES: all: controllers scheduler cli admission diff --git a/hack/run-e2e-kind.sh b/hack/run-e2e-kind.sh index 8c1c667e85..f67f377ec2 100755 --- a/hack/run-e2e-kind.sh +++ b/hack/run-e2e-kind.sh @@ -4,6 +4,7 @@ export VK_ROOT=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/.. export VK_BIN=${VK_ROOT}/_output/bin export LOG_LEVEL=3 export SHOW_VOLCANO_LOGS=${SHOW_VOLCANO_LOGS:-1} +export CLEANUP_CLUSTER=${CLEANUP_CLUSTER:-1} if [[ "${CLUSTER_NAME}xxx" != "xxx" ]];then export CLUSTER_CONTEXT="--name ${CLUSTER_NAME}" @@ -61,7 +62,7 @@ function install-volcano { helm gen-admission-secret --service integration-admission-service --namespace kube-system echo "Install volcano chart" - helm install installer/chart/volcano --namespace kube-system --name integration --kubeconfig ${KUBECONFIG} + helm install installer/chart/volcano --namespace kube-system --name integration --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} } function uninstall-volcano { @@ -98,8 +99,9 @@ Disable displaying volcano component logs: exit 0 fi - -trap cleanup EXIT +if [[ $CLEANUP_CLUSTER -eq 1 ]]; then + trap cleanup EXIT +fi kind-up-cluster diff --git a/installer/chart/volcano/config/kube-batch.conf b/installer/chart/volcano/config/kube-batch.conf index 25df16501b..768db3ef4d 100644 --- a/installer/chart/volcano/config/kube-batch.conf +++ b/installer/chart/volcano/config/kube-batch.conf @@ -3,7 +3,9 @@ tiers: - plugins: - name: priority - name: gang + - name: conformance - plugins: - name: drf - name: predicates - name: proportion + - name: nodeorder diff --git a/pkg/admission/admit_job.go b/pkg/admission/admit_job.go index e28e2dafcc..eb46b79751 100644 --- a/pkg/admission/admit_job.go +++ b/pkg/admission/admit_job.go @@ -69,11 +69,6 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st taskNames := map[string]string{} var totalReplicas int32 - if job.Status.Version != 0 { - reviewResponse.Allowed = false - return fmt.Sprintf("Job Version is used internally, should not be specified.") - } - if len(job.Spec.Tasks) == 0 { reviewResponse.Allowed = false return fmt.Sprintf("No task specified in job spec") diff --git a/pkg/admission/mutate_job.go b/pkg/admission/mutate_job.go index 10bccf5472..00af6a0065 100644 --- a/pkg/admission/mutate_job.go +++ b/pkg/admission/mutate_job.go @@ -72,7 +72,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { func createPatch(job v1alpha1.Job) ([]byte, error) { var patch []patchOperation - patch = append(patch, mutateJobVersion(job.Status, "/status")...) patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...) patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...) @@ -96,16 +95,6 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat return patch } -func mutateJobVersion(jobStatus v1alpha1.JobStatus, basePath string) (patch []patchOperation) { - jobStatus.Version = 1 - patch = append(patch, patchOperation{ - Op: "replace", - Path: basePath, - Value: jobStatus, - }) - return patch -} - func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) { if len(metadata.Annotations) == 0 { metadata.Annotations = make(map[string]string) diff --git a/pkg/apis/batch/v1alpha1/job.go b/pkg/apis/batch/v1alpha1/job.go index f29814acaa..8f04c5607a 100644 --- a/pkg/apis/batch/v1alpha1/job.go +++ b/pkg/apis/batch/v1alpha1/job.go @@ -228,7 +228,7 @@ type JobStatus struct { // +optional Terminating int32 `json:"terminating,omitempty" protobuf:"bytes,7,opt,name=terminating"` //Current version of job - Version int32 + Version int32 `json:"version,omitempty" protobuf:"bytes,8,opt,name=version"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controllers/job/cache/cache.go b/pkg/controllers/job/cache/cache.go index ab34966f7c..a1f022b4a8 100644 --- a/pkg/controllers/job/cache/cache.go +++ b/pkg/controllers/job/cache/cache.go @@ -231,6 +231,9 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) { } func (jc jobCache) TaskCompleted(jobKey, taskName string) bool { + jc.Lock() + defer jc.Unlock() + var taskReplicas, completed int32 jobInfo, found := jc.jobs[jobKey] @@ -244,6 +247,10 @@ func (jc jobCache) TaskCompleted(jobKey, taskName string) bool { return false } + if jobInfo.Job == nil { + return false + } + for _, task := range jobInfo.Job.Spec.Tasks { if task.Name == taskName { taskReplicas = task.Replicas diff --git a/pkg/controllers/job/job_controller_handler.go b/pkg/controllers/job/job_controller_handler.go index 3e512f4cdc..cbf868c483 100644 --- a/pkg/controllers/job/job_controller_handler.go +++ b/pkg/controllers/job/job_controller_handler.go @@ -78,6 +78,11 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { return } + if err := cc.cache.Update(newJob); err != nil { + glog.Errorf("Failed to update job <%s/%s>: %v in cache", + newJob.Namespace, newJob.Name, err) + } + //NOTE: Since we only reconcile job based on Spec, we will ignore other attributes // For Job status, it's used internally and always been updated via our controller. if reflect.DeepEqual(newJob.Spec, oldJob.Spec) { @@ -85,11 +90,6 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) { return } - if err := cc.cache.Update(newJob); err != nil { - glog.Errorf("Failed to update job <%s/%s>: %v in cache", - newJob.Namespace, newJob.Name, err) - } - req := apis.Request{ Namespace: newJob.Namespace, JobName: newJob.Name, diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 04155606ed..5a104c5b7c 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -166,7 +166,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { } //For all the requests triggered from discarded job resources will perform sync action instead - if req.JobVersion > 0 && req.JobVersion < job.Status.Version { + if req.JobVersion < job.Status.Version { glog.Infof("Request %s is outdated, will perform sync instead.", req) return vkv1.SyncJobAction } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 7289d92a03..b408f61aae 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -232,10 +232,9 @@ func preempt( } preempted.Add(preemptee.Resreq) // If reclaimed enough resources, break loop to avoid Sub panic. - if resreq.LessEqual(preemptee.Resreq) { + if resreq.LessEqual(preempted) { break } - resreq.Sub(preemptee.Resreq) } metrics.RegisterPreemptionAttempts() diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 4661db32ba..68e96c8f98 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -166,10 +166,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) { } reclaimed.Add(reclaimee.Resreq) // If reclaimed enough resources, break loop to avoid Sub panic. - if resreq.LessEqual(reclaimee.Resreq) { + if resreq.LessEqual(reclaimed) { break } - resreq.Sub(reclaimee.Resreq) } glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.", diff --git a/test/e2e/admission.go b/test/e2e/admission.go new file mode 100644 index 0000000000..c963746159 --- /dev/null +++ b/test/e2e/admission.go @@ -0,0 +1,125 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/errors" + "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +var _ = Describe("Job E2E Test: Test Admission service", func() { + It("Duplicated Task Name", func() { + jobName := "job-duplicated" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + rep := clusterSize(context, oneCPU) + + _, err := createJobInner(context, &jobSpec{ + namespace: namespace, + name: jobName, + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: oneCPU, + min: rep, + rep: rep, + name: "duplicated", + }, + { + img: defaultNginxImage, + req: oneCPU, + min: rep, + rep: rep, + name: "duplicated", + }, + }, + }) + Expect(err).To(HaveOccurred()) + stError, ok := err.(*errors.StatusError) + Expect(ok).To(Equal(true)) + Expect(stError.ErrStatus.Code).To(Equal(int32(500))) + Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated task name")) + }) + + It("Duplicated Policy Event", func() { + jobName := "job-policy-duplicated" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + rep := clusterSize(context, oneCPU) + + _, err := createJobInner(context, &jobSpec{ + namespace: namespace, + name: jobName, + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: oneCPU, + min: rep, + rep: rep, + name: "taskname", + }, + }, + policies: []v1alpha1.LifecyclePolicy{ + { + Event: v1alpha1.PodFailedEvent, + Action: v1alpha1.AbortJobAction, + }, + { + Event: v1alpha1.PodFailedEvent, + Action: v1alpha1.RestartJobAction, + }, + }, + }) + Expect(err).To(HaveOccurred()) + stError, ok := err.(*errors.StatusError) + Expect(ok).To(Equal(true)) + Expect(stError.ErrStatus.Code).To(Equal(int32(500))) + Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated job event policies")) + }) + + It("Min Available illegal", func() { + jobName := "job-min-illegal" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + rep := clusterSize(context, oneCPU) + + _, err := createJobInner(context, &jobSpec{ + min: rep * 2, + namespace: namespace, + name: jobName, + tasks: []taskSpec{ + { + img: defaultNginxImage, + req: oneCPU, + min: rep, + rep: rep, + name: "taskname", + }, + }, + }) + Expect(err).To(HaveOccurred()) + stError, ok := err.(*errors.StatusError) + Expect(ok).To(Equal(true)) + Expect(stError.ErrStatus.Code).To(Equal(int32(500))) + Expect(stError.ErrStatus.Message).To(ContainSubstring("'minAvailable' should not be greater than total replicas in tasks")) + }) +}) diff --git a/test/e2e/util.go b/test/e2e/util.go index 8393e8b087..543449cc53 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -221,11 +221,11 @@ func cleanupTestContext(cxt *context) { Expect(err).NotTo(HaveOccurred()) // Wait for namespace deleted. - err = wait.Poll(100*time.Millisecond, oneMinute, namespaceNotExist(cxt)) + err = wait.Poll(100*time.Millisecond, twoMinute, namespaceNotExist(cxt)) Expect(err).NotTo(HaveOccurred()) // Wait for queues deleted - err = wait.Poll(100*time.Millisecond, oneMinute, queueNotExist(cxt)) + err = wait.Poll(100*time.Millisecond, twoMinute, queueNotExist(cxt)) Expect(err).NotTo(HaveOccurred()) } @@ -294,6 +294,7 @@ type jobSpec struct { queue string tasks []taskSpec policies []vkv1.LifecyclePolicy + min int32 } func getNS(context *context, job *jobSpec) string { @@ -311,6 +312,14 @@ func getNS(context *context, job *jobSpec) string { } func createJob(context *context, jobSpec *jobSpec) *vkv1.Job { + + job, err := createJobInner(context, jobSpec) + Expect(err).NotTo(HaveOccurred()) + + return job +} + +func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) { ns := getNS(context, jobSpec) job := &vkv1.Job{ @@ -366,12 +375,13 @@ func createJob(context *context, jobSpec *jobSpec) *vkv1.Job { min += task.min } - job.Spec.MinAvailable = min - - job, err := context.vkclient.BatchV1alpha1().Jobs(job.Namespace).Create(job) - Expect(err).NotTo(HaveOccurred()) + if jobSpec.min > 0 { + job.Spec.MinAvailable = jobSpec.min + } else { + job.Spec.MinAvailable = min + } - return job + return context.vkclient.BatchV1alpha1().Jobs(job.Namespace).Create(job) } func taskPhase(ctx *context, job *vkv1.Job, phase []v1.PodPhase, taskNum int) wait.ConditionFunc {