From 96a4a82ba955fa6fccf709d5f664118d9f82d658 Mon Sep 17 00:00:00 2001 From: Thandayuthapani Date: Tue, 23 Jul 2019 19:26:19 +0530 Subject: [PATCH] Add E2E for TensorFlow integration with Volcano --- hack/run-e2e-kind.sh | 3 + test/e2e/tensorflow.go | 123 +++++++++++++++++++++++++++++++++++++++++ test/e2e/util.go | 17 +++--- 3 files changed, 135 insertions(+), 8 deletions(-) create mode 100644 test/e2e/tensorflow.go diff --git a/hack/run-e2e-kind.sh b/hack/run-e2e-kind.sh index 3f5946d3fa3..c623766ddcf 100755 --- a/hack/run-e2e-kind.sh +++ b/hack/run-e2e-kind.sh @@ -6,6 +6,7 @@ export LOG_LEVEL=3 export SHOW_VOLCANO_LOGS=${SHOW_VOLCANO_LOGS:-1} export CLEANUP_CLUSTER=${CLEANUP_CLUSTER:-1} export MPI_EXAMPLE_IMAGE=${MPI_EXAMPLE_IMAGE:-"volcanosh/example-mpi:0.0.1"} +export TF_EXAMPLE_IMAGE=${TF_EXAMPLE_IMAGE:-"thanda/tf-operator-example:1.0"} if [[ "${CLUSTER_NAME}xxx" == "xxx" ]];then CLUSTER_NAME="integration" @@ -33,9 +34,11 @@ function install-volcano { echo "Pulling required docker images" docker pull ${MPI_EXAMPLE_IMAGE} + docker pull ${TF_EXAMPLE_IMAGE} echo "Loading docker images into kind cluster" kind load docker-image ${MPI_EXAMPLE_IMAGE} ${CLUSTER_CONTEXT} + kind load docker-image ${TF_EXAMPLE_IMAGE} ${CLUSTER_CONTEXT} echo "Install volcano chart" helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=volcano-scheduler-ci.conf --wait diff --git a/test/e2e/tensorflow.go b/test/e2e/tensorflow.go new file mode 100644 index 00000000000..4d28a9aae39 --- /dev/null +++ b/test/e2e/tensorflow.go @@ -0,0 +1,123 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +var _ = Describe("TensorFlow E2E Test", func() { + It("Will Start in pending state and goes through other phases to get complete phase", func() { + context := initTestContext() + defer cleanupTestContext(context) + + jobName := "tensorflow-dist-mnist" + + job := &vkv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + }, + Spec: vkv1.JobSpec{ + MinAvailable: int32(3), + SchedulerName: schedulerName, + Plugins: map[string][]string{ + "svc": {}, + "env": {}, + }, + Policies: []vkv1.LifecyclePolicy{ + { + Event: vkv1.PodEvictedEvent, + Action: vkv1.RestartJobAction, + }, + }, + Tasks: []vkv1.TaskSpec{ + { + Replicas: int32(1), + Name: "ps", + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Command: []string{ + "sh", + "-c", + "PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; export TF_CONFIG={\\\"cluster\\\":{\\\"ps\\\":[${PS_HOST}],\\\"worker\\\":[${WORKER_HOST}]},\\\"task\\\":{\\\"type\\\":\\\"ps\\\",\\\"index\\\":${VK_TASK_INDEX}},\\\"environment\\\":\\\"cloud\\\"}; echo ${TF_CONFIG}; python /var/tf_dist_mnist/dist_mnist.py --train_steps 1000", + }, + Image: defaultTFImage, + Name: "tensorflow", + Ports: []v1.ContainerPort{ + { + Name: "tfjob-port", + ContainerPort: int32(2222), + }, + }, + }, + }, + }, + }, + }, + { + Replicas: int32(2), + Name: "worker", + Policies: []vkv1.LifecyclePolicy{ + { + Event: vkv1.TaskCompletedEvent, + Action: vkv1.CompleteJobAction, + }, + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Command: []string{ + "sh", + "-c", + "PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; export TF_CONFIG={\\\"cluster\\\":{\\\"ps\\\":[${PS_HOST}],\\\"worker\\\":[${WORKER_HOST}]},\\\"task\\\":{\\\"type\\\":\\\"worker\\\",\\\"index\\\":${VK_TASK_INDEX}},\\\"environment\\\":\\\"cloud\\\"}; echo ${TF_CONFIG}; python /var/tf_dist_mnist/dist_mnist.py --train_steps 1000", + }, + Image: defaultTFImage, + Name: "tensorflow", + Ports: []v1.ContainerPort{ + { + Name: "tfjob-port", + ContainerPort: int32(2222), + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + created, err := context.vkclient.BatchV1alpha1().Jobs("test").Create(job) + Expect(err).NotTo(HaveOccurred()) + + err = waitJobStates(context, created, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Completed}, twoMinute) + Expect(err).NotTo(HaveOccurred()) + }) + +}) diff --git a/test/e2e/util.go b/test/e2e/util.go index a486f8f161c..a27acf8e3be 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -65,6 +65,7 @@ const ( defaultBusyBoxImage = "busybox:1.24" defaultMPIImage = "volcanosh/example-mpi:0.0.1" schedulerName = "volcano" + defaultTFImage = "volcanosh/dist-mnist-tf-example:0.0.1" defaultNamespace = "test" defaultQueue1 = "q1" @@ -542,9 +543,9 @@ func waitJobPhases(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { return nil } -func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase) error { +func waitJobStates(ctx *context, job *vkv1.Job, phases []vkv1.JobPhase, waitTime time.Duration) error { for _, phase := range phases { - err := waitJobPhaseExpect(ctx, job, phase) + err := waitJobPhaseExpect(ctx, job, phase, waitTime) if err != nil { return err } @@ -628,24 +629,24 @@ func waitTasksPending(ctx *context, job *vkv1.Job, taskNum int) error { } func waitJobStateReady(ctx *context, job *vkv1.Job) error { - return waitJobPhaseExpect(ctx, job, vkv1.Running) + return waitJobPhaseExpect(ctx, job, vkv1.Running, oneMinute) } func waitJobStatePending(ctx *context, job *vkv1.Job) error { - return waitJobPhaseExpect(ctx, job, vkv1.Pending) + return waitJobPhaseExpect(ctx, job, vkv1.Pending, oneMinute) } func waitJobStateInqueue(ctx *context, job *vkv1.Job) error { - return waitJobPhaseExpect(ctx, job, vkv1.Inqueue) + return waitJobPhaseExpect(ctx, job, vkv1.Inqueue, oneMinute) } func waitJobStateAborted(ctx *context, job *vkv1.Job) error { - return waitJobPhaseExpect(ctx, job, vkv1.Aborted) + return waitJobPhaseExpect(ctx, job, vkv1.Aborted, oneMinute) } -func waitJobPhaseExpect(ctx *context, job *vkv1.Job, state vkv1.JobPhase) error { +func waitJobPhaseExpect(ctx *context, job *vkv1.Job, state vkv1.JobPhase, waitTime time.Duration) error { var additionalError error - err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { + err := wait.Poll(100*time.Millisecond, waitTime, func() (bool, error) { job, err := ctx.vkclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) expected := job.Status.State.Phase == state