diff --git a/example/integrations/tensorflow/Dockerfile b/example/integrations/tensorflow/benchmark/Dockerfile similarity index 100% rename from example/integrations/tensorflow/Dockerfile rename to example/integrations/tensorflow/benchmark/Dockerfile diff --git a/example/integrations/tensorflow/tf-example.yaml b/example/integrations/tensorflow/benchmark/tf-example.yaml similarity index 100% rename from example/integrations/tensorflow/tf-example.yaml rename to example/integrations/tensorflow/benchmark/tf-example.yaml diff --git a/example/integrations/tensorflow/dist-mnist/tf-dist-mnist-example.yaml b/example/integrations/tensorflow/dist-mnist/tf-dist-mnist-example.yaml new file mode 100644 index 0000000000..aea35cd13d --- /dev/null +++ b/example/integrations/tensorflow/dist-mnist/tf-dist-mnist-example.yaml @@ -0,0 +1,57 @@ +apiVersion: batch.volcano.sh/v1alpha1 +kind: Job +metadata: + name: tensorflow-dist-mnist +spec: + minAvailable: 3 + schedulerName: volcano + plugins: + env: [] + svc: [] + policies: + - event: PodEvicted + action: RestartJob + tasks: + - replicas: 1 + name: ps + template: + spec: + containers: + - command: + - sh + - -c + - | + PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`; + WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`; + export TF_CONFIG={\"cluster\":{\"ps\":[${PS_HOST}],\"worker\":[${WORKER_HOST}]},\"task\":{\"type\":\"ps\",\"index\":${VK_TASK_INDEX}},\"environment\":\"cloud\"}; + python /var/tf_dist_mnist/dist_mnist.py + image: volcanosh/dist-mnist-tf-example:0.0.1 + name: tensorflow + ports: + - containerPort: 2222 + name: tfjob-port + resources: {} + restartPolicy: Never + - replicas: 2 + name: worker + policies: + - event: TaskCompleted + action: CompleteJob + template: + spec: + containers: + - command: + - sh + - -c + - | + PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`; + WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/"/;s/$/"/' | tr "\n" ","`; + export TF_CONFIG={\"cluster\":{\"ps\":[${PS_HOST}],\"worker\":[${WORKER_HOST}]},\"task\":{\"type\":\"worker\",\"index\":${VK_TASK_INDEX}},\"environment\":\"cloud\"}; + python /var/tf_dist_mnist/dist_mnist.py + image: volcanosh/dist-mnist-tf-example:0.0.1 + name: tensorflow + ports: + - containerPort: 2222 + name: tfjob-port + resources: {} + restartPolicy: Never \ No newline at end of file diff --git a/hack/run-e2e-kind.sh b/hack/run-e2e-kind.sh index 3f5946d3fa..1c7885d52d 100755 --- a/hack/run-e2e-kind.sh +++ b/hack/run-e2e-kind.sh @@ -6,6 +6,7 @@ export LOG_LEVEL=3 export SHOW_VOLCANO_LOGS=${SHOW_VOLCANO_LOGS:-1} export CLEANUP_CLUSTER=${CLEANUP_CLUSTER:-1} export MPI_EXAMPLE_IMAGE=${MPI_EXAMPLE_IMAGE:-"volcanosh/example-mpi:0.0.1"} +export TF_EXAMPLE_IMAGE=${TF_EXAMPLE_IMAGE:-"volcanosh/dist-mnist-tf-example:0.0.1"} if [[ "${CLUSTER_NAME}xxx" == "xxx" ]];then CLUSTER_NAME="integration" @@ -33,9 +34,11 @@ function install-volcano { echo "Pulling required docker images" docker pull ${MPI_EXAMPLE_IMAGE} + docker pull ${TF_EXAMPLE_IMAGE} echo "Loading docker images into kind cluster" kind load docker-image ${MPI_EXAMPLE_IMAGE} ${CLUSTER_CONTEXT} + kind load docker-image ${TF_EXAMPLE_IMAGE} ${CLUSTER_CONTEXT} echo "Install volcano chart" helm install installer/helm/chart/volcano --namespace kube-system --name ${CLUSTER_NAME} --kubeconfig ${KUBECONFIG} --set basic.image_tag_version=${TAG} --set basic.scheduler_config_file=volcano-scheduler-ci.conf --wait diff --git a/test/e2e/tensorflow.go b/test/e2e/tensorflow.go new file mode 100644 index 0000000000..67100c70aa --- /dev/null +++ b/test/e2e/tensorflow.go @@ -0,0 +1,123 @@ +/* +Copyright 2019 The Volcano Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + vkv1 "volcano.sh/volcano/pkg/apis/batch/v1alpha1" +) + +var _ = Describe("TensorFlow E2E Test", func() { + It("Will Start in pending state and goes through other phases to get complete phase", func() { + context := initTestContext() + defer cleanupTestContext(context) + + jobName := "tensorflow-dist-mnist" + + job := &vkv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: jobName, + }, + Spec: vkv1.JobSpec{ + MinAvailable: int32(3), + SchedulerName: schedulerName, + Plugins: map[string][]string{ + "svc": {}, + "env": {}, + }, + Policies: []vkv1.LifecyclePolicy{ + { + Event: vkv1.PodEvictedEvent, + Action: vkv1.RestartJobAction, + }, + }, + Tasks: []vkv1.TaskSpec{ + { + Replicas: int32(1), + Name: "ps", + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Command: []string{ + "sh", + "-c", + "PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; export TF_CONFIG={\\\"cluster\\\":{\\\"ps\\\":[${PS_HOST}],\\\"worker\\\":[${WORKER_HOST}]},\\\"task\\\":{\\\"type\\\":\\\"ps\\\",\\\"index\\\":${VK_TASK_INDEX}},\\\"environment\\\":\\\"cloud\\\"}; echo ${TF_CONFIG}; python /var/tf_dist_mnist/dist_mnist.py --train_steps 1000", + }, + Image: defaultTFImage, + Name: "tensorflow", + Ports: []v1.ContainerPort{ + { + Name: "tfjob-port", + ContainerPort: int32(2222), + }, + }, + }, + }, + }, + }, + }, + { + Replicas: int32(2), + Name: "worker", + Policies: []vkv1.LifecyclePolicy{ + { + Event: vkv1.TaskCompletedEvent, + Action: vkv1.CompleteJobAction, + }, + }, + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + { + Command: []string{ + "sh", + "-c", + "PS_HOST=`cat /etc/volcano/ps.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; WORKER_HOST=`cat /etc/volcano/worker.host | sed 's/$/&:2222/g' | sed 's/^/\"/;s/$/\"/' | tr \"\n\" \",\"`; export TF_CONFIG={\\\"cluster\\\":{\\\"ps\\\":[${PS_HOST}],\\\"worker\\\":[${WORKER_HOST}]},\\\"task\\\":{\\\"type\\\":\\\"worker\\\",\\\"index\\\":${VK_TASK_INDEX}},\\\"environment\\\":\\\"cloud\\\"}; echo ${TF_CONFIG}; python /var/tf_dist_mnist/dist_mnist.py --train_steps 1000", + }, + Image: defaultTFImage, + Name: "tensorflow", + Ports: []v1.ContainerPort{ + { + Name: "tfjob-port", + ContainerPort: int32(2222), + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + created, err := context.vcclient.BatchV1alpha1().Jobs("test").Create(job) + Expect(err).NotTo(HaveOccurred()) + + err = waitJobStates(context, created, []vkv1.JobPhase{vkv1.Pending, vkv1.Inqueue, vkv1.Running, vkv1.Completed}, twoMinute) + Expect(err).NotTo(HaveOccurred()) + }) + +}) diff --git a/test/e2e/util.go b/test/e2e/util.go index b3a4f3afe9..837a97c9a0 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -64,6 +64,7 @@ const ( defaultMPIImage = "volcanosh/example-mpi:0.0.1" schedulerName = "volcano" executeAction = "ExecuteAction" + defaultTFImage = "volcanosh/dist-mnist-tf-example:0.0.1" defaultNamespace = "test" defaultQueue1 = "q1" @@ -576,9 +577,9 @@ func waitJobPhases(ctx *context, job *batchv1alpha1.Job, phases []batchv1alpha1. return nil } -func waitJobStates(ctx *context, job *batchv1alpha1.Job, phases []batchv1alpha1.JobPhase) error { +func waitJobStates(ctx *context, job *batchv1alpha1.Job, phases []batchv1alpha1.JobPhase, waitTime time.Duration) error { for _, phase := range phases { - err := waitJobPhaseExpect(ctx, job, phase) + err := waitJobPhaseExpect(ctx, job, phase, waitTime) if err != nil { return err } @@ -666,22 +667,22 @@ func waitTasksPending(ctx *context, job *batchv1alpha1.Job, taskNum int) error { } func waitJobStateReady(ctx *context, job *batchv1alpha1.Job) error { - return waitJobPhaseExpect(ctx, job, batchv1alpha1.Running) + return waitJobPhaseExpect(ctx, job, batchv1alpha1.Running, oneMinute) } func waitJobStatePending(ctx *context, job *batchv1alpha1.Job) error { - return waitJobPhaseExpect(ctx, job, batchv1alpha1.Pending) + return waitJobPhaseExpect(ctx, job, batchv1alpha1.Pending, oneMinute) } func waitJobStateInqueue(ctx *context, job *batchv1alpha1.Job) error { - return waitJobPhaseExpect(ctx, job, batchv1alpha1.Inqueue) + return waitJobPhaseExpect(ctx, job, batchv1alpha1.Inqueue, oneMinute) } func waitJobStateAborted(ctx *context, job *batchv1alpha1.Job) error { - return waitJobPhaseExpect(ctx, job, batchv1alpha1.Aborted) + return waitJobPhaseExpect(ctx, job, batchv1alpha1.Aborted, oneMinute) } -func waitJobPhaseExpect(ctx *context, job *batchv1alpha1.Job, state batchv1alpha1.JobPhase) error { +func waitJobPhaseExpect(ctx *context, job *batchv1alpha1.Job, state batchv1alpha1.JobPhase, waitTime time.Duration) error { var additionalError error err := wait.Poll(100*time.Millisecond, oneMinute, func() (bool, error) { job, err := ctx.vcclient.BatchV1alpha1().Jobs(job.Namespace).Get(job.Name, metav1.GetOptions{})