Skip to content

Commit

Permalink
Merge pull request #45 from volcano-sh/feature/support-admission-test
Browse files Browse the repository at this point in the history
Add admission e2e test
  • Loading branch information
Klaus Ma authored Mar 26, 2019
2 parents fc95697 + 03134d0 commit a045f24
Show file tree
Hide file tree
Showing 13 changed files with 169 additions and 39 deletions.
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
BIN_DIR=_output/bin
export IMAGE=volcano
export TAG = 1.0
IMAGE=volcano
TAG = 0.1

.EXPORT_ALL_VARIABLES:

all: controllers scheduler cli admission

Expand Down
8 changes: 5 additions & 3 deletions hack/run-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export VK_ROOT=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/..
export VK_BIN=${VK_ROOT}/_output/bin
export LOG_LEVEL=3
export SHOW_VOLCANO_LOGS=${SHOW_VOLCANO_LOGS:-1}
export CLEANUP_CLUSTER=${CLEANUP_CLUSTER:-1}

if [[ "${CLUSTER_NAME}xxx" != "xxx" ]];then
export CLUSTER_CONTEXT="--name ${CLUSTER_NAME}"
Expand Down Expand Up @@ -61,7 +62,7 @@ function install-volcano {
helm gen-admission-secret --service integration-admission-service --namespace kube-system

echo "Install volcano chart"
helm install installer/chart/volcano --namespace kube-system --name integration --kubeconfig ${KUBECONFIG}
helm install installer/chart/volcano --namespace kube-system --name integration --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG}
}

function uninstall-volcano {
Expand Down Expand Up @@ -98,8 +99,9 @@ Disable displaying volcano component logs:
exit 0
fi


trap cleanup EXIT
if [[ $CLEANUP_CLUSTER -eq 1 ]]; then
trap cleanup EXIT
fi


kind-up-cluster
Expand Down
2 changes: 2 additions & 0 deletions installer/chart/volcano/config/kube-batch.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
5 changes: 0 additions & 5 deletions pkg/admission/admit_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,6 @@ func validateJob(job v1alpha1.Job, reviewResponse *v1beta1.AdmissionResponse) st
taskNames := map[string]string{}
var totalReplicas int32

if job.Status.Version != 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("Job Version is used internally, should not be specified.")
}

if len(job.Spec.Tasks) == 0 {
reviewResponse.Allowed = false
return fmt.Sprintf("No task specified in job spec")
Expand Down
11 changes: 0 additions & 11 deletions pkg/admission/mutate_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ func MutateJobs(ar v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {

func createPatch(job v1alpha1.Job) ([]byte, error) {
var patch []patchOperation
patch = append(patch, mutateJobVersion(job.Status, "/status")...)
patch = append(patch, mutateSpec(job.Spec.Tasks, "/spec/tasks")...)
patch = append(patch, mutateMetadata(job.ObjectMeta, "/metadata")...)

Expand All @@ -96,16 +95,6 @@ func mutateSpec(tasks []v1alpha1.TaskSpec, basePath string) (patch []patchOperat
return patch
}

func mutateJobVersion(jobStatus v1alpha1.JobStatus, basePath string) (patch []patchOperation) {
jobStatus.Version = 1
patch = append(patch, patchOperation{
Op: "replace",
Path: basePath,
Value: jobStatus,
})
return patch
}

func mutateMetadata(metadata metav1.ObjectMeta, basePath string) (patch []patchOperation) {
if len(metadata.Annotations) == 0 {
metadata.Annotations = make(map[string]string)
Expand Down
2 changes: 1 addition & 1 deletion pkg/apis/batch/v1alpha1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ type JobStatus struct {
// +optional
Terminating int32 `json:"terminating,omitempty" protobuf:"bytes,7,opt,name=terminating"`
//Current version of job
Version int32
Version int32 `json:"version,omitempty" protobuf:"bytes,8,opt,name=version"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
7 changes: 7 additions & 0 deletions pkg/controllers/job/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,9 @@ func (jc *jobCache) Run(stopCh <-chan struct{}) {
}

func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
jc.Lock()
defer jc.Unlock()

var taskReplicas, completed int32

jobInfo, found := jc.jobs[jobKey]
Expand All @@ -244,6 +247,10 @@ func (jc jobCache) TaskCompleted(jobKey, taskName string) bool {
return false
}

if jobInfo.Job == nil {
return false
}

for _, task := range jobInfo.Job.Spec.Tasks {
if task.Name == taskName {
taskReplicas = task.Replicas
Expand Down
10 changes: 5 additions & 5 deletions pkg/controllers/job/job_controller_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,18 @@ func (cc *Controller) updateJob(oldObj, newObj interface{}) {
return
}

if err := cc.cache.Update(newJob); err != nil {
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}

//NOTE: Since we only reconcile job based on Spec, we will ignore other attributes
// For Job status, it's used internally and always been updated via our controller.
if reflect.DeepEqual(newJob.Spec, oldJob.Spec) {
glog.Infof("Job update event is ignored since no update in 'Spec'.")
return
}

if err := cc.cache.Update(newJob); err != nil {
glog.Errorf("Failed to update job <%s/%s>: %v in cache",
newJob.Namespace, newJob.Name, err)
}

req := apis.Request{
Namespace: newJob.Namespace,
JobName: newJob.Name,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action {
}

//For all the requests triggered from discarded job resources will perform sync action instead
if req.JobVersion > 0 && req.JobVersion < job.Status.Version {
if req.JobVersion < job.Status.Version {
glog.Infof("Request %s is outdated, will perform sync instead.", req)
return vkv1.SyncJobAction
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,9 @@ func preempt(
}
preempted.Add(preemptee.Resreq)
// If reclaimed enough resources, break loop to avoid Sub panic.
if resreq.LessEqual(preemptee.Resreq) {
if resreq.LessEqual(preempted) {
break
}
resreq.Sub(preemptee.Resreq)
}

metrics.RegisterPreemptionAttempts()
Expand Down
3 changes: 1 addition & 2 deletions pkg/scheduler/actions/reclaim/reclaim.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ func (alloc *reclaimAction) Execute(ssn *framework.Session) {
}
reclaimed.Add(reclaimee.Resreq)
// If reclaimed enough resources, break loop to avoid Sub panic.
if resreq.LessEqual(reclaimee.Resreq) {
if resreq.LessEqual(reclaimed) {
break
}
resreq.Sub(reclaimee.Resreq)
}

glog.V(3).Infof("Reclaimed <%v> for task <%s/%s> requested <%v>.",
Expand Down
125 changes: 125 additions & 0 deletions test/e2e/admission.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
Copyright 2019 The Volcano Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
"volcano.sh/volcano/pkg/apis/batch/v1alpha1"
)

var _ = Describe("Job E2E Test: Test Admission service", func() {
It("Duplicated Task Name", func() {
jobName := "job-duplicated"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)

_, err := createJobInner(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "duplicated",
},
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "duplicated",
},
},
})
Expect(err).To(HaveOccurred())
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated task name"))
})

It("Duplicated Policy Event", func() {
jobName := "job-policy-duplicated"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)

_, err := createJobInner(context, &jobSpec{
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "taskname",
},
},
policies: []v1alpha1.LifecyclePolicy{
{
Event: v1alpha1.PodFailedEvent,
Action: v1alpha1.AbortJobAction,
},
{
Event: v1alpha1.PodFailedEvent,
Action: v1alpha1.RestartJobAction,
},
},
})
Expect(err).To(HaveOccurred())
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("duplicated job event policies"))
})

It("Min Available illegal", func() {
jobName := "job-min-illegal"
namespace := "test"
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)

_, err := createJobInner(context, &jobSpec{
min: rep * 2,
namespace: namespace,
name: jobName,
tasks: []taskSpec{
{
img: defaultNginxImage,
req: oneCPU,
min: rep,
rep: rep,
name: "taskname",
},
},
})
Expect(err).To(HaveOccurred())
stError, ok := err.(*errors.StatusError)
Expect(ok).To(Equal(true))
Expect(stError.ErrStatus.Code).To(Equal(int32(500)))
Expect(stError.ErrStatus.Message).To(ContainSubstring("'minAvailable' should not be greater than total replicas in tasks"))
})
})
24 changes: 17 additions & 7 deletions test/e2e/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,11 @@ func cleanupTestContext(cxt *context) {
Expect(err).NotTo(HaveOccurred())

// Wait for namespace deleted.
err = wait.Poll(100*time.Millisecond, oneMinute, namespaceNotExist(cxt))
err = wait.Poll(100*time.Millisecond, twoMinute, namespaceNotExist(cxt))
Expect(err).NotTo(HaveOccurred())

// Wait for queues deleted
err = wait.Poll(100*time.Millisecond, oneMinute, queueNotExist(cxt))
err = wait.Poll(100*time.Millisecond, twoMinute, queueNotExist(cxt))
Expect(err).NotTo(HaveOccurred())
}

Expand Down Expand Up @@ -294,6 +294,7 @@ type jobSpec struct {
queue string
tasks []taskSpec
policies []vkv1.LifecyclePolicy
min int32
}

func getNS(context *context, job *jobSpec) string {
Expand All @@ -311,6 +312,14 @@ func getNS(context *context, job *jobSpec) string {
}

func createJob(context *context, jobSpec *jobSpec) *vkv1.Job {

job, err := createJobInner(context, jobSpec)
Expect(err).NotTo(HaveOccurred())

return job
}

func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) {
ns := getNS(context, jobSpec)

job := &vkv1.Job{
Expand Down Expand Up @@ -366,12 +375,13 @@ func createJob(context *context, jobSpec *jobSpec) *vkv1.Job {
min += task.min
}

job.Spec.MinAvailable = min

job, err := context.vkclient.BatchV1alpha1().Jobs(job.Namespace).Create(job)
Expect(err).NotTo(HaveOccurred())
if jobSpec.min > 0 {
job.Spec.MinAvailable = jobSpec.min
} else {
job.Spec.MinAvailable = min
}

return job
return context.vkclient.BatchV1alpha1().Jobs(job.Namespace).Create(job)
}

func taskPhase(ctx *context, job *vkv1.Job, phase []v1.PodPhase, taskNum int) wait.ConditionFunc {
Expand Down

0 comments on commit a045f24

Please sign in to comment.