diff --git a/pkg/controllers/job/job_controller_actions.go b/pkg/controllers/job/job_controller_actions.go index 94017c11b6..f3d2f21e37 100644 --- a/pkg/controllers/job/job_controller_actions.go +++ b/pkg/controllers/job/job_controller_actions.go @@ -28,7 +28,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kbv1 "github.com/kubernetes-sigs/kube-batch/pkg/apis/scheduling/v1alpha1" - kbapi "github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api" vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" "volcano.sh/volcano/pkg/apis/helpers" "volcano.sh/volcano/pkg/controllers/apis" @@ -492,7 +491,7 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { sort.Sort(tasksPriority) - minAvailableTasksRes := kbapi.EmptyResource() + minAvailableTasksRes := v1.ResourceList{} podCnt := int32(0) for _, task := range tasksPriority { for i := int32(0); i < task.Replicas; i++ { @@ -501,10 +500,10 @@ func (cc *Controller) calcPGMinResources(job *vkv1.Job) *v1.ResourceList { } podCnt++ for _, c := range task.Template.Spec.Containers { - minAvailableTasksRes.Add(kbapi.NewResource(c.Resources.Requests)) + addResourceList(minAvailableTasksRes, c.Resources.Requests) } } } - return minAvailableTasksRes.Convert2K8sResource() + return &minAvailableTasksRes } diff --git a/pkg/controllers/job/job_controller_util.go b/pkg/controllers/job/job_controller_util.go index 70f96616e0..22c7458923 100644 --- a/pkg/controllers/job/job_controller_util.go +++ b/pkg/controllers/job/job_controller_util.go @@ -187,6 +187,17 @@ func applyPolicies(job *vkv1.Job, req *apis.Request) vkv1.Action { return vkv1.SyncJobAction } +func addResourceList(list, new v1.ResourceList) { + for name, quantity := range new { + if value, ok := list[name]; !ok { + list[name] = *quantity.Copy() + } else { + value.Add(quantity) + list[name] = value + } + } +} + type TaskPriority struct { priority int32 diff --git a/test/e2e/job_controlled_resource.go b/test/e2e/job_controlled_resource.go index f6277f872d..b1f3ffb0fe 100644 --- a/test/e2e/job_controlled_resource.go +++ b/test/e2e/job_controlled_resource.go @@ -19,6 +19,8 @@ package e2e import ( . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + v12 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/apis/meta/v1" "volcano.sh/volcano/pkg/apis/batch/v1alpha1" ) @@ -68,4 +70,59 @@ var _ = Describe("Job E2E Test: Test Job PVCs", func() { "PVC name should be generated for manually specified.") } }) + + It("Generate PodGroup and valid minResource when creating job", func() { + jobName := "job-name-podgroup" + namespace := "test" + context := initTestContext() + defer cleanupTestContext(context) + + resource := v12.ResourceList{ + "cpu": resource.MustParse("1000m"), + "memory": resource.MustParse("1000Mi"), + "nvidia.com/gpu": resource.MustParse("1"), + } + + job := createJob(context, &jobSpec{ + namespace: namespace, + name: jobName, + tasks: []taskSpec{ + { + img: defaultNginxImage, + min: 1, + rep: 1, + name: "task-1", + req: resource, + limit: resource, + }, + { + img: defaultNginxImage, + min: 1, + rep: 1, + name: "task-2", + req: resource, + limit: resource, + }, + }, + }) + + expected := map[string]int64{ + "cpu": 2, + "memory": 1024 * 1024 * 2000, + "nvidia.com/gpu": 2, + } + + err := waitJobStatePending(context, job) + Expect(err).NotTo(HaveOccurred()) + + pGroup, err := context.kbclient.SchedulingV1alpha1().PodGroups(namespace).Get(jobName, v1.GetOptions{}) + Expect(err).NotTo(HaveOccurred()) + + for name, q := range *pGroup.Spec.MinResources { + value, ok := expected[string(name)] + Expect(ok).To(Equal(true), "Resource %s should exists in PodGroup", name) + Expect(q.Value()).To(Equal(value), "Resource %s 's value should equal to %d", name, value) + } + + }) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 53e274a9f7..2680568b7e 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -262,6 +262,7 @@ type taskSpec struct { workingDir string hostport int32 req v1.ResourceList + limit v1.ResourceList affinity *v1.Affinity labels map[string]string policies []vkv1.LifecyclePolicy @@ -338,7 +339,7 @@ func createJobInner(context *context, jobSpec *jobSpec) (*vkv1.Job, error) { Spec: v1.PodSpec{ SchedulerName: "kube-batch", RestartPolicy: restartPolicy, - Containers: createContainers(task.img, task.command, task.workingDir, task.req, task.hostport), + Containers: createContainers(task.img, task.command, task.workingDir, task.req, task.limit, task.hostport), Affinity: task.affinity, }, }, @@ -597,13 +598,14 @@ func waitQueueStatus(condition func() (bool, error)) error { return wait.Poll(100*time.Millisecond, oneMinute, condition) } -func createContainers(img, command, workingDir string, req v1.ResourceList, hostport int32) []v1.Container { +func createContainers(img, command, workingDir string, req, limit v1.ResourceList, hostport int32) []v1.Container { var imageRepo []string container := v1.Container{ Image: img, ImagePullPolicy: v1.PullIfNotPresent, Resources: v1.ResourceRequirements{ Requests: req, + Limits: limit, }, } if strings.Index(img, ":") < 0 {