From bf6602b4002be54f068c9992cf2c41a60c123b7a Mon Sep 17 00:00:00 2001 From: Klaus Ma Date: Sun, 15 Dec 2019 23:17:11 +0800 Subject: [PATCH] Added hosts into environment. Signed-off-by: Klaus Ma --- pkg/controllers/job/plugins/svc/const.go | 4 ++ pkg/controllers/job/plugins/svc/svc.go | 50 +++++++++++++++++++++--- test/e2e/job_plugins.go | 33 ++++++++++++---- 3 files changed, 74 insertions(+), 13 deletions(-) diff --git a/pkg/controllers/job/plugins/svc/const.go b/pkg/controllers/job/plugins/svc/const.go index 526500468ae..6dd7ea5348d 100644 --- a/pkg/controllers/job/plugins/svc/const.go +++ b/pkg/controllers/job/plugins/svc/const.go @@ -19,6 +19,10 @@ package svc const ( // ConfigMapTaskHostFmt key in config map ConfigMapTaskHostFmt = "%s.host" + // EnvTaskHostFmt is the key for host list in environment + EnvTaskHostFmt = "VC_%s_HOSTS" + // EnvHostNumFmt is the key for host number in environment + EnvHostNumFmt = "VC_%s_NUM" // ConfigMapMountPath mount path ConfigMapMountPath = "/etc/volcano" diff --git a/pkg/controllers/job/plugins/svc/svc.go b/pkg/controllers/job/plugins/svc/svc.go index 54c79a47a81..98b64f2e035 100644 --- a/pkg/controllers/job/plugins/svc/svc.go +++ b/pkg/controllers/job/plugins/svc/svc.go @@ -19,6 +19,7 @@ package svc import ( "flag" "fmt" + "strconv" "strings" "k8s.io/klog" @@ -88,6 +89,34 @@ func (sp *servicePlugin) OnPodCreate(pod *v1.Pod, job *batch.Job) error { pod.Spec.Subdomain = job.Name } + var hostEnv []v1.EnvVar + var envNames []string + + for _, ts := range job.Spec.Tasks { + // TODO(k82cn): The splitter and the prefix of env should be configurable. + envNames = append(envNames, fmt.Sprintf(EnvTaskHostFmt, strings.ToUpper(ts.Name))) + envNames = append(envNames, fmt.Sprintf(EnvHostNumFmt, strings.ToUpper(ts.Name))) + } + + for _, name := range envNames { + hostEnv = append(hostEnv, v1.EnvVar{ + Name: name, + ValueFrom: &v1.EnvVarSource{ + ConfigMapKeyRef: &v1.ConfigMapKeySelector{ + LocalObjectReference: v1.LocalObjectReference{Name: sp.cmName(job)}, + Key: name, + }}}, + ) + } + + for i := range pod.Spec.Containers { + pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env, hostEnv...) + } + + for i := range pod.Spec.InitContainers { + pod.Spec.InitContainers[i].Env = append(pod.Spec.InitContainers[i].Env, hostEnv...) + } + sp.mountConfigmap(pod, job) return nil @@ -98,9 +127,10 @@ func (sp *servicePlugin) OnJobAdd(job *batch.Job) error { return nil } - data := generateHost(job) + hostFile := generateHosts(job) - if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, data, sp.cmName(job)); err != nil { + // Create ConfigMap of hosts for Pods to mount. + if err := helpers.CreateConfigMapIfNotExist(job, sp.Clientset.KubeClients, hostFile, sp.cmName(job)); err != nil { return err } @@ -258,8 +288,8 @@ func (sp *servicePlugin) cmName(job *batch.Job) string { return fmt.Sprintf("%s-%s", job.Name, sp.Name()) } -func generateHost(job *batch.Job) map[string]string { - data := make(map[string]string, len(job.Spec.Tasks)) +func generateHosts(job *batch.Job) map[string]string { + hostFile := make(map[string]string, len(job.Spec.Tasks)) for _, ts := range job.Spec.Tasks { hosts := make([]string, 0, ts.Replicas) @@ -280,8 +310,16 @@ func generateHost(job *batch.Job) map[string]string { } key := fmt.Sprintf(ConfigMapTaskHostFmt, ts.Name) - data[key] = strings.Join(hosts, "\n") + hostFile[key] = strings.Join(hosts, "\n") + + // TODO(k82cn): The splitter and the prefix of env should be configurable. + // export hosts as environment + key = fmt.Sprintf(EnvTaskHostFmt, strings.ToUpper(ts.Name)) + hostFile[key] = strings.Join(hosts, ",") + // export host number as environment. + key = fmt.Sprintf(EnvHostNumFmt, strings.ToUpper(ts.Name)) + hostFile[key] = strconv.Itoa(len(hosts)) } - return data + return hostFile } diff --git a/test/e2e/job_plugins.go b/test/e2e/job_plugins.go index d8fa27e14e1..0d78c95637e 100644 --- a/test/e2e/job_plugins.go +++ b/test/e2e/job_plugins.go @@ -18,13 +18,18 @@ package e2e import ( "fmt" + "strings" + . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + cv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/scheduler/api" + "volcano.sh/volcano/pkg/controllers/job/helpers" "volcano.sh/volcano/pkg/controllers/job/plugins/env" + "volcano.sh/volcano/pkg/controllers/job/plugins/svc" ) var _ = Describe("Job E2E Test: Test Job Plugins", func() { @@ -222,16 +227,30 @@ var _ = Describe("Job E2E Test: Test Job Plugins", func() { } Expect(foundVolume).To(BeTrue()) - // Check whether env exists in the pod - for _, container := range pod.Spec.Containers { - for _, envi := range container.Env { - if envi.Name == env.TaskVkIndex { - foundEnv = true - break + // Check whether env exists in the containers and initContainers + containers := pod.Spec.Containers + containers = append(containers, pod.Spec.InitContainers...) + envNames := []string{ + env.TaskVkIndex, + env.TaskIndex, + fmt.Sprintf(svc.EnvTaskHostFmt, strings.ToUpper(taskName)), + fmt.Sprintf(svc.EnvHostNumFmt, strings.ToUpper(taskName)), + } + + for _, container := range containers { + for _, name := range envNames { + foundEnv = false + for _, envi := range container.Env { + if envi.Name == name { + foundEnv = true + break + } } + + Expect(foundEnv).To(BeTrue(), + fmt.Sprint("container: %s, env name: %s", container.Name, name)) } } - Expect(foundEnv).To(BeTrue()) // Check whether service is created with job name _, err = context.kubeclient.CoreV1().Services(job.Namespace).Get(job.Name, v1.GetOptions{})