From befcf0a30651d0335bb57c242a824e43748db33f Mon Sep 17 00:00:00 2001 From: Ilan Filonenko Date: Mon, 3 Jul 2017 12:25:59 -0700 Subject: [PATCH] Python Bindings for launching PySpark Jobs from the JVM (#364) * Adding PySpark Submit functionality. Launching Python from JVM * Addressing scala idioms related to PR351 * Removing extends Logging which was necessary for LogInfo * Refactored code to leverage the ContainerLocalizedFileResolver * Modified Unit tests so that they would pass * Modified Unit Test input to pass Unit Tests * Setup working environent for integration tests for PySpark * Comment out Python thread logic until Jenkins has python in Python * Modifying PythonExec to pass on Jenkins * Modifying python exec * Added unit tests to ClientV2 and refactored to include pyspark submission resources * Modified unit test check * Scalastyle * PR 348 file conflicts * Refactored unit tests and styles * further scala stylzing and logic * Modified unit tests to be more specific towards Class in question * Removed space delimiting for methods * Submission client redesign to use a step-based builder pattern. This change overhauls the underlying architecture of the submission client, but it is intended to entirely preserve existing behavior of Spark applications. Therefore users will find this to be an invisible change. The philosophy behind this design is to reconsider the breakdown of the submission process. It operates off the abstraction of "submission steps", which are transformation functions that take the previous state of the driver and return the new state of the driver. The driver's state includes its Spark configurations and the Kubernetes resources that will be used to deploy it. Such a refactor moves away from a features-first API design, which considers different containers to serve a set of features. The previous design, for example, had a container files resolver API object that returned different resolutions of the dependencies added by the user. However, it was up to the main Client to know how to intelligently invoke all of those APIs. Therefore the API surface area of the file resolver became untenably large and it was not intuitive of how it was to be used or extended. This design changes the encapsulation layout; every module is now responsible for changing the driver specification directly. An orchestrator builds the correct chain of steps and hands it to the client, which then calls it verbatim. The main client then makes any final modifications that put the different pieces of the driver together, particularly to attach the driver container itself to the pod and to apply the Spark configuration as command-line arguments. * Don't add the init-container step if all URIs are local. * Python arguments patch + tests + docs * Revert "Python arguments patch + tests + docs" This reverts commit 4533df2a03e2a8922988b0bd01691ad1f26e5d03. * Revert "Don't add the init-container step if all URIs are local." This reverts commit e103225d9ff54ca17692279cc6a7999f9b8c3265. * Revert "Submission client redesign to use a step-based builder pattern." This reverts commit 5499f6ddf9b42c0526f1dc053317afb38dc71294. * style changes * space for styling --- README.md | 1 + .../org/apache/spark/deploy/SparkSubmit.scala | 14 +- docs/running-on-kubernetes.md | 26 ++++ .../spark/deploy/kubernetes/constants.scala | 2 + .../deploy/kubernetes/submit/Client.scala | 77 ++++++---- .../ContainerLocalizedFilesResolver.scala | 39 +++-- ...riverInitContainerComponentsProvider.scala | 25 ++-- .../DriverPodKubernetesFileMounter.scala | 55 +++++++ .../submit/PythonSubmissionResources.scala | 75 ++++++++++ .../kubernetes/submit/ClientV2Suite.scala | 139 +++++++++++++++--- ...ContainerLocalizedFilesResolverSuite.scala | 24 +++ .../PythonSubmissionResourcesSuite.scala | 109 ++++++++++++++ .../src/main/docker/driver-py/Dockerfile | 48 ++++++ .../src/main/docker/executor-py/Dockerfile | 46 ++++++ .../src/main/docker/init-container/Dockerfile | 2 +- .../docker/resource-staging-server/Dockerfile | 3 +- .../main/docker/shuffle-service/Dockerfile | 2 +- .../kubernetes/integration-tests/pom.xml | 102 +++++++++++++ .../integration-tests/src/test/python/pi.py | 46 ++++++ .../integrationtest/KubernetesSuite.scala | 40 ++++- .../docker/SparkDockerImageBuilder.scala | 29 +++- 21 files changed, 831 insertions(+), 73 deletions(-) create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala create mode 100644 resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile create mode 100644 resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile create mode 100755 resource-managers/kubernetes/integration-tests/src/test/python/pi.py diff --git a/README.md b/README.md index cf6b4fa80242b..cb747225a11d4 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,7 @@ We've been asked by an Apache Spark Committer to work outside of the Apache infr This is a collaborative effort by several folks from different companies who are interested in seeing this feature be successful. Companies active in this project include (alphabetically): +- Bloomberg - Google - Haiwen - Hyperpilot diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 59ccf3af24ce7..9256a9ddd9960 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -335,8 +335,8 @@ object SparkSubmit { (clusterManager, deployMode) match { case (KUBERNETES, CLIENT) => printErrorAndExit("Client mode is currently not supported for Kubernetes.") - case (KUBERNETES, CLUSTER) if args.isPython || args.isR => - printErrorAndExit("Kubernetes does not currently support python or R applications.") + case (KUBERNETES, CLUSTER) if args.isR => + printErrorAndExit("Kubernetes does not currently support R applications.") case (STANDALONE, CLUSTER) if args.isPython => printErrorAndExit("Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.") @@ -620,8 +620,14 @@ object SparkSubmit { if (isKubernetesCluster) { childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client" - childArgs += args.primaryResource - childArgs += args.mainClass + if (args.isPython) { + childArgs += args.primaryResource + childArgs += "org.apache.spark.deploy.PythonRunner" + childArgs += args.pyFiles + } else { + childArgs += args.primaryResource + childArgs += args.mainClass + } childArgs ++= args.childArgs } diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 3a50860f826c5..2b4e9a6f96af1 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -180,6 +180,32 @@ The above mechanism using `kubectl proxy` can be used when we have authenticatio kubernetes-client library does not support. Authentication using X509 Client Certs and OAuth tokens is currently supported. +### Running PySpark + +Running PySpark on Kubernetes leverages the same spark-submit logic when launching on Yarn and Mesos. +Python files can be distributed by including, in the conf, `--py-files` + +Below is an example submission: + + +``` + bin/spark-submit \ + --deploy-mode cluster \ + --master k8s://http://127.0.0.1:8001 \ + --kubernetes-namespace default \ + --conf spark.executor.memory=500m \ + --conf spark.driver.memory=1G \ + --conf spark.driver.cores=1 \ + --conf spark.executor.cores=1 \ + --conf spark.executor.instances=1 \ + --conf spark.app.name=spark-pi \ + --conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \ + --conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \ + --conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \ + --py-files local:///opt/spark/examples/src/main/python/sort.py \ + local:///opt/spark/examples/src/main/python/pi.py 100 +``` + ## Dynamic Executor Scaling Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index f2f1136e54fe4..92f051b2ac298 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -67,6 +67,8 @@ package object constants { private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS" private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS" private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR" + private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES" + private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY" // Bootstrapping dependencies with the init-container private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 8220127eac449..781ecbd6c5416 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -47,11 +47,11 @@ private[spark] class Client( appName: String, kubernetesResourceNamePrefix: String, kubernetesAppId: String, + mainAppResource: String, + pythonResource: Option[PythonSubmissionResourcesImpl], mainClass: String, sparkConf: SparkConf, appArgs: Array[String], - sparkJars: Seq[String], - sparkFiles: Seq[String], waitForAppCompletion: Boolean, kubernetesClient: KubernetesClient, initContainerComponentsProvider: DriverInitContainerComponentsProvider, @@ -82,9 +82,7 @@ private[spark] class Client( org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) def run(): Unit = { - validateNoDuplicateFileNames(sparkJars) - validateNoDuplicateFileNames(sparkFiles) - + val arguments = (pythonResource map {p => p.arguments}).getOrElse(appArgs) val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX, @@ -136,7 +134,7 @@ private[spark] class Client( .endEnv() .addNewEnv() .withName(ENV_DRIVER_ARGS) - .withValue(appArgs.mkString(" ")) + .withValue(arguments.mkString(" ")) .endEnv() .withNewResources() .addToRequests("cpu", driverCpuQuantity) @@ -182,10 +180,13 @@ private[spark] class Client( .map(_.build()) val containerLocalizedFilesResolver = initContainerComponentsProvider - .provideContainerLocalizedFilesResolver() + .provideContainerLocalizedFilesResolver(mainAppResource) val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() - + val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles() + val resolvedPrimaryPySparkResource = pythonResource.map { + p => p.primaryPySparkResource(containerLocalizedFilesResolver) + }.getOrElse("") val initContainerBundler = initContainerComponentsProvider .provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()), resolvedSparkJars ++ resolvedSparkFiles) @@ -221,7 +222,7 @@ private[spark] class Client( val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { case (confKey, confValue) => s"-D$confKey=$confValue" }.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") - val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec() + val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec() .editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) .addNewEnv() .withName(ENV_MOUNTED_CLASSPATH) @@ -233,7 +234,15 @@ private[spark] class Client( .endEnv() .endContainer() .endSpec() - .build() + val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter() + val resolvedDriverPod = pythonResource.map { + p => p.driverPodWithPySparkEnvs( + driverPodFileMounter, + resolvedPrimaryPySparkResource, + resolvedPySparkFiles.mkString(","), + driverContainer.getName, + resolvedDriverPodBuilder + )}.getOrElse(resolvedDriverPodBuilder.build()) Utils.tryWithResource( kubernetesClient .pods() @@ -271,17 +280,6 @@ private[spark] class Client( } } } - - private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = { - val fileNamesToUris = allFiles.map { file => - (new File(Utils.resolveURI(file).getPath).getName, file) - } - fileNamesToUris.groupBy(_._1).foreach { - case (fileName, urisWithFileName) => - require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" + - s" file name $fileName is shared by all of these URIs: $urisWithFileName") - } - } } private[spark] object Client { @@ -292,22 +290,34 @@ private[spark] object Client { val appArgs = args.drop(2) run(sparkConf, mainAppResource, mainClass, appArgs) } - def run( sparkConf: SparkConf, mainAppResource: String, mainClass: String, appArgs: Array[String]): Unit = { + val isPython = mainAppResource.endsWith(".py") + val pythonResource: Option[PythonSubmissionResourcesImpl] = + if (isPython) { + Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs)) + } else None + // Since you might need jars for SQL UDFs in PySpark + def sparkJarFilter(): Seq[String] = + pythonResource.map {p => p.sparkJars}.getOrElse( + Option(mainAppResource) + .filterNot(_ == SparkLauncher.NO_RESOURCE) + .toSeq) val sparkJars = sparkConf.getOption("spark.jars") .map(_.split(",")) - .getOrElse(Array.empty[String]) ++ - Option(mainAppResource) - .filterNot(_ == SparkLauncher.NO_RESOURCE) - .toSeq + .getOrElse(Array.empty[String]) ++ sparkJarFilter() val launchTime = System.currentTimeMillis val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) + val pySparkFilesOption = pythonResource.map {p => p.pySparkFiles} + validateNoDuplicateFileNames(sparkJars) + validateNoDuplicateFileNames(sparkFiles) + pySparkFilesOption.foreach {b => validateNoDuplicateFileNames(b)} + val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String]) val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") // The resource name prefix is derived from the application name, making it easy to connect the // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the @@ -326,6 +336,7 @@ private[spark] object Client { namespace, sparkJars, sparkFiles, + pySparkFiles, sslOptionsProvider.getSslOptions) Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( master, @@ -346,11 +357,11 @@ private[spark] object Client { appName, kubernetesResourceNamePrefix, kubernetesAppId, + mainAppResource, + pythonResource, mainClass, sparkConf, appArgs, - sparkJars, - sparkFiles, waitForAppCompletion, kubernetesClient, initContainerComponentsProvider, @@ -358,4 +369,14 @@ private[spark] object Client { loggingPodStatusWatcher).run() } } + private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = { + val fileNamesToUris = allFiles.map { file => + (new File(Utils.resolveURI(file).getPath).getName, file) + } + fileNamesToUris.groupBy(_._1).foreach { + case (fileName, urisWithFileName) => + require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" + + s" file name $fileName is shared by all of these URIs: $urisWithFileName") + } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala index c635484c4c124..c31aa5f306bea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala @@ -24,14 +24,19 @@ private[spark] trait ContainerLocalizedFilesResolver { def resolveSubmittedAndRemoteSparkJars(): Seq[String] def resolveSubmittedSparkJars(): Seq[String] def resolveSubmittedSparkFiles(): Seq[String] + def resolveSubmittedPySparkFiles(): Seq[String] + def resolvePrimaryResourceFile(): String } private[spark] class ContainerLocalizedFilesResolverImpl( sparkJars: Seq[String], sparkFiles: Seq[String], + pySparkFiles: Seq[String], + primaryPyFile: String, jarsDownloadPath: String, filesDownloadPath: String) extends ContainerLocalizedFilesResolver { + override def resolveSubmittedAndRemoteSparkJars(): Seq[String] = { sparkJars.map { jar => val jarUri = Utils.resolveURI(jar) @@ -53,16 +58,30 @@ private[spark] class ContainerLocalizedFilesResolverImpl( resolveSubmittedFiles(sparkFiles, filesDownloadPath) } - private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = { - files.map { file => - val fileUri = Utils.resolveURI(file) - Option(fileUri.getScheme).getOrElse("file") match { - case "file" => - val fileName = new File(fileUri.getPath).getName - s"$downloadPath/$fileName" - case _ => - file - } + override def resolveSubmittedPySparkFiles(): Seq[String] = { + def filterMainResource(x: String) = x match { + case `primaryPyFile` => None + case _ => Some(resolveFile(x, filesDownloadPath)) + } + pySparkFiles.flatMap(x => filterMainResource(x)) + } + + override def resolvePrimaryResourceFile(): String = { + Option(primaryPyFile).map(p => resolveFile(p, filesDownloadPath)).getOrElse("") + } + + private def resolveFile(file: String, downloadPath: String) = { + val fileUri = Utils.resolveURI(file) + Option(fileUri.getScheme).getOrElse("file") match { + case "file" => + val fileName = new File(fileUri.getPath).getName + s"$downloadPath/$fileName" + case _ => + file } } + + private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = { + files.map { file => resolveFile(file, downloadPath) } + } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala index cc1837cce6736..6e185d2c069f6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala @@ -32,13 +32,15 @@ import org.apache.spark.util.Utils */ private[spark] trait DriverInitContainerComponentsProvider { - def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver + def provideContainerLocalizedFilesResolver( + mainAppResource: String): ContainerLocalizedFilesResolver def provideInitContainerSubmittedDependencyUploader( driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader] def provideSubmittedDependenciesSecretBuilder( maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets]) : Option[SubmittedDependencySecretBuilder] def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap + def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds], uris: Iterable[String]): Option[InitContainerBundle] } @@ -49,6 +51,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( namespace: String, sparkJars: Seq[String], sparkFiles: Seq[String], + pySparkFiles: Seq[String], resourceStagingServerExternalSslOptions: SSLOptions) extends DriverInitContainerComponentsProvider { @@ -104,6 +107,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) + private val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles) private def provideInitContainerConfigMap( maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = { @@ -130,7 +134,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( } new SparkInitContainerConfigMapBuilderImpl( sparkJars, - sparkFiles, + sparkFiles ++ pySparkSubmitted, jarsDownloadPath, filesDownloadPath, configMapName, @@ -138,9 +142,10 @@ private[spark] class DriverInitContainerComponentsProviderImpl( submittedDependencyConfigPlugin).build() } - override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = { + override def provideContainerLocalizedFilesResolver(mainAppResource: String) + : ContainerLocalizedFilesResolver = { new ContainerLocalizedFilesResolverImpl( - sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath) + sparkJars, sparkFiles, pySparkFiles, mainAppResource, jarsDownloadPath, filesDownloadPath) } private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = { @@ -159,7 +164,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl( namespace, stagingServerUri, sparkJars, - sparkFiles, + sparkFiles ++ pySparkSubmitted, resourceStagingServerExternalSslOptions, RetrofitClientFactoryImpl) } @@ -201,13 +206,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl( configMapKey, resourceStagingServerSecretPlugin) } - + override def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter = { + new DriverPodKubernetesFileMounterImpl() + } override def provideInitContainerBundle( maybeSubmittedResourceIds: Option[SubmittedResourceIds], uris: Iterable[String]): Option[InitContainerBundle] = { - val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver() - // Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs - if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) { + // Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes' + // is empty or only has `local://` URIs + if ((KubernetesFileUtils.getNonContainerLocalFiles(uris) ++ pySparkSubmitted).nonEmpty) { Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds), provideInitContainerBootstrap(), provideExecutorInitContainerConfiguration())) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala new file mode 100644 index 0000000000000..cc0ef0eedb457 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.{Container, PodBuilder} + +import org.apache.spark.deploy.kubernetes.constants._ + + /** + * Trait that is responsible for providing full file-paths dynamically after + * the filesDownloadPath has been defined. The file-names are then stored in the + * environmental variables in the driver-pod. + */ +private[spark] trait DriverPodKubernetesFileMounter { + def addPySparkFiles(primaryFile: String, pySparkFiles: String, + mainContainerName: String, originalPodSpec: PodBuilder) : PodBuilder +} + +private[spark] class DriverPodKubernetesFileMounterImpl() + extends DriverPodKubernetesFileMounter { + override def addPySparkFiles( + primaryFile: String, + pySparkFiles: String, + mainContainerName: String, + originalPodSpec: PodBuilder): PodBuilder = { + + originalPodSpec + .editSpec() + .editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) + .addNewEnv() + .withName(ENV_PYSPARK_PRIMARY) + .withValue(primaryFile) + .endEnv() + .addNewEnv() + .withName(ENV_PYSPARK_FILES) + .withValue(pySparkFiles) + .endEnv() + .endContainer() + .endSpec() + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala new file mode 100644 index 0000000000000..c61e930a2b97f --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} + +private[spark] trait PythonSubmissionResources { + def sparkJars: Seq[String] + def pySparkFiles: Array[String] + def arguments: Array[String] + def primaryPySparkResource(containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) + : String + def driverPodWithPySparkEnvs( + driverPodFileMounter: DriverPodKubernetesFileMounter, + resolvedPrimaryPySparkResource: String, + resolvedPySparkFiles: String, + driverContainerName: String, + driverPodBuilder: PodBuilder): Pod +} + +private[spark] class PythonSubmissionResourcesImpl( + private val mainAppResource: String, + private val appArgs: Array[String] ) extends PythonSubmissionResources { + + private val pyFiles: Array[String] = { + Option(appArgs(0)).map(a => mainAppResource +: a.split(",")) + .getOrElse(Array(mainAppResource)) + } + + override def sparkJars: Seq[String] = Seq.empty[String] + + override def pySparkFiles: Array[String] = pyFiles + + override def arguments: Array[String] = { + pyFiles.toList match { + case Nil => appArgs + case a :: b => a match { + case _ if a == mainAppResource && b == Nil => appArgs + case _ => appArgs.drop(1) + } + } + } + override def primaryPySparkResource( + containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) : String = + containerLocalizedFilesResolver.resolvePrimaryResourceFile() + + override def driverPodWithPySparkEnvs( + driverPodFileMounter: DriverPodKubernetesFileMounter, + resolvedPrimaryPySparkResource: String, + resolvedPySparkFiles: String, + driverContainerName: String, + driverPodBuilder: PodBuilder) : Pod = { + driverPodFileMounter + .addPySparkFiles( + resolvedPrimaryPySparkResource, + resolvedPySparkFiles, + driverContainerName, + driverPodBuilder) + .build() + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 8992a56e20c80..a58a37691f4eb 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder} +import io.fabric8.kubernetes.api.model._ import io.fabric8.kubernetes.client.{KubernetesClient, Watch} import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource} import org.hamcrest.{BaseMatcher, Description} @@ -27,10 +27,10 @@ import org.mockito.Matchers.{any, anyVararg, argThat, eq => mockitoEq} import org.mockito.Mockito.{times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer -import org.scalatest.BeforeAndAfter +import org.scalatest.{BeforeAndAfter, Matchers} + import scala.collection.JavaConverters._ import scala.collection.mutable - import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.kubernetes.{KubernetesExternalShuffleService, KubernetesShuffleBlockHandler, SparkPodInitContainerBootstrap} import org.apache.spark.deploy.kubernetes.config._ @@ -63,6 +63,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val INIT_CONTAINER_SECRET_NAME = "init-container-secret" private val INIT_CONTAINER_SECRET_DATA = Map("secret-key" -> "secret-data") private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" + private val PYSPARK_APP_ARGS = Array(null, "500") private val APP_ARGS = Array("3", "20") private val SPARK_JARS = Seq( "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar") @@ -72,6 +73,20 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { "/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar") private val SPARK_FILES = Seq( "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt") + private val PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "file:///app/files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py", + "file:///app/files/file5.py") + private val RESOLVED_PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "/var/spark-data/spark-files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py") + private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py" + private val RESOLVED_PYSPARK_PRIMARY_FILE = "/var/spark-data/spark-file/file5.py" + private val RESOLVED_SPARK_FILES = Seq( "hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt") private val INIT_CONTAINER_SECRET = new SecretBuilder() @@ -138,7 +153,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { @Mock private var kubernetesClient: KubernetesClient = _ @Mock - private var podOps: MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] = _ + private var podOps: MixedOperation[ + Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] = _ private type ResourceListOps = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[ HasMetadata, java.lang.Boolean] @Mock @@ -146,6 +162,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { @Mock private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _ @Mock + private var fileMounter: DriverPodKubernetesFileMounter = _ + @Mock private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _ @Mock private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _ @@ -169,8 +187,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .endMetadata() } }) - when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver()) - .thenReturn(containerLocalizedFilesResolver) + when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver( + any[String])).thenReturn(containerLocalizedFilesResolver) + when(initContainerComponentsProvider.provideDriverPodFileMounter()) + .thenReturn(fileMounter) when(submittedDependenciesSecretBuilder.build()) .thenReturn(INIT_CONTAINER_SECRET) when(kubernetesClient.pods()).thenReturn(podOps) @@ -178,14 +198,30 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { override def answer(invocation: InvocationOnMock): Pod = { new PodBuilder(invocation.getArgumentAt(0, classOf[Pod])) .editMetadata() - .withUid(DRIVER_POD_UID) + .withUid(DRIVER_POD_UID) .endMetadata() - .withKind(DRIVER_POD_KIND) + .withKind(DRIVER_POD_KIND) .withApiVersion(DRIVER_POD_API_VERSION) .build() } }) when(podOps.withName(s"$APP_RESOURCE_PREFIX-driver")).thenReturn(namedPodResource) + when(fileMounter.addPySparkFiles( + mockitoEq(RESOLVED_PYSPARK_PRIMARY_FILE), + mockitoEq(RESOLVED_PYSPARK_FILES.mkString(",")), + any[String], + any())).thenAnswer( new Answer[PodBuilder] { + override def answer(invocation: InvocationOnMock) : PodBuilder = { + invocation.getArgumentAt(3, classOf[PodBuilder]) + .editMetadata() + .withUid(DRIVER_POD_UID) + .withName(s"$APP_RESOURCE_PREFIX-driver") + .addToLabels("pyspark-test", "true") + .endMetadata() + .withKind(DRIVER_POD_KIND) + .withApiVersion(DRIVER_POD_API_VERSION) + } + }) when(namedPodResource.watch(loggingPodStatusWatcher)).thenReturn(watch) when(containerLocalizedFilesResolver.resolveSubmittedAndRemoteSparkJars()) .thenReturn(RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS) @@ -193,6 +229,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .thenReturn(RESOLVED_SPARK_JARS) when(containerLocalizedFilesResolver.resolveSubmittedSparkFiles()) .thenReturn(RESOLVED_SPARK_FILES) + when(containerLocalizedFilesResolver.resolvePrimaryResourceFile()) + .thenReturn(RESOLVED_PYSPARK_PRIMARY_FILE) + when(containerLocalizedFilesResolver.resolveSubmittedPySparkFiles()) + .thenReturn(RESOLVED_PYSPARK_FILES) when(executorInitContainerConfiguration.configureSparkConfForExecutorInitContainer(SPARK_CONF)) .thenReturn(SPARK_CONF_WITH_EXECUTOR_INIT_CONF) when(kubernetesClient.resourceList(anyVararg[HasMetadata]())).thenReturn(resourceListOps) @@ -208,9 +248,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { when(initContainerComponentsProvider .provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets()))) .thenReturn(Some(submittedDependenciesSecretBuilder)) - when(initContainerComponentsProvider.provideInitContainerBundle(Some(SUBMITTED_RESOURCES.ids()), - RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES)) - .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, + when(initContainerComponentsProvider.provideInitContainerBundle(mockitoEq( + Option(SUBMITTED_RESOURCES.ids())), + mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES))) + .thenReturn(Option(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, initContainerBootstrap, executorInitContainerConfiguration))) runAndVerifyDriverPodHasCorrectProperties() val resourceListArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata]) @@ -292,11 +333,11 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { APP_NAME, APP_RESOURCE_PREFIX, APP_ID, + "", + None, MAIN_CLASS, SPARK_CONF, APP_ARGS, - SPARK_JARS, - SPARK_FILES, true, kubernetesClient, initContainerComponentsProvider, @@ -305,6 +346,20 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { verify(loggingPodStatusWatcher).awaitCompletion() } + test("Mounting environmental variables correctly onto Driver Pod for PySpark Jobs") { + expectationsForNoMountedCredentials() + expectationsForNoDependencyUploader() + expectationsForNoSparkJarsOrFiles() + runAndVerifyDriverPodHasCorrectPySparkProperties() + } + + private def expectationsForNoSparkJarsOrFiles(): Unit = { + when(containerLocalizedFilesResolver.resolveSubmittedSparkFiles()) + .thenReturn(Nil) + when(containerLocalizedFilesResolver.resolveSubmittedSparkJars()) + .thenReturn(Nil) + } + private def expectationsForNoDependencyUploader(): Unit = { when(initContainerComponentsProvider .provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS)) @@ -312,8 +367,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { when(initContainerComponentsProvider .provideSubmittedDependenciesSecretBuilder(None)) .thenReturn(None) - when(initContainerComponentsProvider.provideInitContainerBundle(None, RESOLVED_SPARK_JARS ++ - RESOLVED_SPARK_FILES)) + when(initContainerComponentsProvider.provideInitContainerBundle(mockitoEq(None), + mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES))) .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, initContainerBootstrap, executorInitContainerConfiguration))) } @@ -373,16 +428,28 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { } } + private def runAndVerifyDriverPodHasCorrectPySparkProperties(): Unit = { + when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver( + mockitoEq(PYSPARK_PRIMARY_FILE))).thenReturn(containerLocalizedFilesResolver) + when(initContainerComponentsProvider.provideInitContainerBundle( + any[Option[SubmittedResourceIds]], any[Iterable[String]])) + .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP, + initContainerBootstrap, executorInitContainerConfiguration))) + runAndVerifyPySparkPodMatchesPredicate { p => + Option(p).exists(pod => containerHasCorrectPySparkEnvs(pod)) + } + } + private def runAndVerifyPodMatchesPredicate(pred: (Pod => Boolean)): Unit = { new Client( APP_NAME, APP_RESOURCE_PREFIX, APP_ID, + "", + None, MAIN_CLASS, SPARK_CONF, APP_ARGS, - SPARK_JARS, - SPARK_FILES, false, kubernetesClient, initContainerComponentsProvider, @@ -434,6 +501,15 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { expectedBasicEnvs.toSet.subsetOf(envs.toSet) } + private def containerHasCorrectPySparkEnvs(pod: Pod): Boolean = { + val driverPodLabels = + pod.getMetadata.getLabels.asScala.map(env => (env._1.toString, env._2.toString)) + val expectedBasicLabels = Map( + "pyspark-test" -> "true", + "spark-role" -> "driver") + expectedBasicLabels.toSet.subsetOf(driverPodLabels.toSet) + } + private def containerHasCorrectBasicContainerConfiguration(pod: Pod): Boolean = { val containers = pod.getSpec.getContainers.asScala containers.size == 1 && @@ -450,4 +526,33 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { BOOTSTRAPPED_POD_ANNOTATION -> TRUE) pod.getMetadata.getAnnotations.asScala == expectedAnnotations } + + private def runAndVerifyPySparkPodMatchesPredicate(pred: (Pod => Boolean)): Unit = { + new Client( + APP_NAME, + APP_RESOURCE_PREFIX, + APP_ID, + PYSPARK_PRIMARY_FILE, + Option(new PythonSubmissionResourcesImpl(PYSPARK_PRIMARY_FILE, PYSPARK_APP_ARGS)), + MAIN_CLASS, + SPARK_CONF, + PYSPARK_APP_ARGS, + false, + kubernetesClient, + initContainerComponentsProvider, + credentialsMounterProvider, + loggingPodStatusWatcher).run() + val podMatcher = new BaseMatcher[Pod] { + override def matches(o: scala.Any): Boolean = { + o match { + case p: Pod => pred(p) + case _ => false + } + } + override def describeTo(description: Description): Unit = {} + } + verify(podOps).create(argThat(podMatcher)) + } } + + diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala index ca5cd1fff9b74..7e51abcd7b8e0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala @@ -29,11 +29,20 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite { "file:///app/files/file2.txt", "local:///app/files/file3.txt", "http://app/files/file4.txt") + private val PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "file:///app/files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py", + "file:///app/files/file5.py") private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars" private val FILES_DOWNLOAD_PATH = "/var/data/spark-files" + private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py" private val localizedFilesResolver = new ContainerLocalizedFilesResolverImpl( SPARK_JARS, SPARK_FILES, + PYSPARK_FILES, + PYSPARK_PRIMARY_FILE, JARS_DOWNLOAD_PATH, FILES_DOWNLOAD_PATH) @@ -66,4 +75,19 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite { "http://app/files/file4.txt") assert(resolvedFiles === expectedResolvedFiles) } + test("Submitted PySpark files should resolve to the download path.") { + val resolvedPySparkFiles = localizedFilesResolver.resolveSubmittedPySparkFiles() + val expectedPySparkFiles = Seq( + "hdfs://localhost:9000/app/files/file1.py", + s"$FILES_DOWNLOAD_PATH/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py") + assert(resolvedPySparkFiles === expectedPySparkFiles) + } + test("Submitted PySpark Primary resource should resolve to the download path.") { + val resolvedPySparkPrimary = + localizedFilesResolver.resolvePrimaryResourceFile() + val expectedPySparkPrimary = s"$FILES_DOWNLOAD_PATH/file5.py" + assert(resolvedPySparkPrimary === expectedPySparkPrimary) + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala new file mode 100644 index 0000000000000..9b60b7ef2b786 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.kubernetes.submit + +import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.kubernetes.config._ + +import scala.collection.JavaConverters._ +import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder} +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter + +private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite with BeforeAndAfter { + private val PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "file:///app/files/file2.py", + "local:///app/files/file3.py", + "http://app/files/file4.py", + "file:///app/files/file5.py") + private val RESOLVED_PYSPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.py", + "/var/spark-data/spark-files/file2.py", + "local:///app/file`s/file3.py", + "http://app/files/file4.py") + private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py" + private val RESOLVED_PYSPARK_PRIMARY_FILE = "/var/data/spark-files/file5.py" + + private val pyFilesResource = new PythonSubmissionResourcesImpl( + PYSPARK_PRIMARY_FILE, Array(PYSPARK_FILES.mkString(","), "500") + ) + private val pyResource = new PythonSubmissionResourcesImpl( + PYSPARK_PRIMARY_FILE, Array(null, "500") + ) + private val DRIVER_CONTAINER_NAME = "pyspark_container" + private val driverContainer = new ContainerBuilder() + .withName(DRIVER_CONTAINER_NAME) + .build() + private val basePodBuilder = new PodBuilder() + .withNewMetadata() + .withName("base_pod") + .endMetadata() + .withNewSpec() + .addToContainers(driverContainer) + .endSpec() + + @Mock + private var driverInitContainer: DriverInitContainerComponentsProviderImpl = _ + @Mock + private var localizedFileResolver: ContainerLocalizedFilesResolverImpl = _ + before { + MockitoAnnotations.initMocks(this) + when(driverInitContainer.provideDriverPodFileMounter()).thenReturn( + new DriverPodKubernetesFileMounterImpl() + ) + when(localizedFileResolver.resolvePrimaryResourceFile()).thenReturn( + RESOLVED_PYSPARK_PRIMARY_FILE) + } + test("Test with --py-files included") { + assert(pyFilesResource.sparkJars === Seq.empty[String]) + assert(pyFilesResource.pySparkFiles === + PYSPARK_PRIMARY_FILE +: PYSPARK_FILES) + assert(pyFilesResource.primaryPySparkResource(localizedFileResolver) === + RESOLVED_PYSPARK_PRIMARY_FILE) + val driverPod: Pod = pyFilesResource.driverPodWithPySparkEnvs( + driverInitContainer.provideDriverPodFileMounter(), + RESOLVED_PYSPARK_PRIMARY_FILE, + RESOLVED_PYSPARK_FILES.mkString(","), + DRIVER_CONTAINER_NAME, + basePodBuilder + ) + val driverContainer = driverPod.getSpec.getContainers.asScala.head + val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)).toMap + envs.get("PYSPARK_PRIMARY") foreach{ a => assert (a === RESOLVED_PYSPARK_PRIMARY_FILE) } + envs.get("PYSPARK_FILES") foreach{ a => assert (a === RESOLVED_PYSPARK_FILES.mkString(",")) } + } + + test("Test without --py-files") { + assert(pyResource.sparkJars === Seq.empty[String]) + assert(pyResource.pySparkFiles === Array(PYSPARK_PRIMARY_FILE)) + assert(pyResource.primaryPySparkResource(localizedFileResolver) === + RESOLVED_PYSPARK_PRIMARY_FILE) + val driverPod: Pod = pyResource.driverPodWithPySparkEnvs( + driverInitContainer.provideDriverPodFileMounter(), + RESOLVED_PYSPARK_PRIMARY_FILE, + "", + DRIVER_CONTAINER_NAME, + basePodBuilder + ) + val driverContainer = driverPod.getSpec.getContainers.asScala.head + val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)).toMap + envs.get("PYSPARK_PRIMARY") foreach{ a => assert (a === RESOLVED_PYSPARK_PRIMARY_FILE) } + envs.get("PYSPARK_FILES") foreach{ a => assert (a === "") } + } +} \ No newline at end of file diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile new file mode 100644 index 0000000000000..6dcc7511c0dd9 --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-driver-py:latest -f dockerfiles/driver-py/Dockerfile . + +ADD examples /opt/spark/examples +ADD python /opt/spark/python + +RUN apk add --no-cache python && \ + python -m ensurepip && \ + rm -r /usr/lib/python*/ensurepip && \ + pip install --upgrade pip setuptools && \ + rm -r /root/.cache +# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES +# RUN apk add --update alpine-sdk python-dev +# RUN pip install numpy + +ENV PYTHON_VERSION 2.7.13 +ENV PYSPARK_PYTHON python +ENV PYSPARK_DRIVER_PYTHON python +ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + exec /sbin/tini -- ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH \ + -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY \ + $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile new file mode 100644 index 0000000000000..7a65a4f879376 --- /dev/null +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile @@ -0,0 +1,46 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-executor-py:latest -f dockerfiles/executor-py/Dockerfile . + +ADD examples /opt/spark/examples +ADD python /opt/spark/python + +RUN apk add --no-cache python && \ + python -m ensurepip && \ + rm -r /usr/lib/python*/ensurepip && \ + pip install --upgrade pip setuptools && \ + rm -r /root/.cache +# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES +# RUN apk add --update alpine-sdk python-dev +# RUN pip install numpy + +ENV PYTHON_VERSION 2.7.13 +ENV PYSPARK_PYTHON python +ENV PYSPARK_DRIVER_PYTHON python +ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} + +# TODO support spark.executor.extraClassPath +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ + if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ + exec /sbin/tini -- ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP \ No newline at end of file diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile index 6bff06da12840..4bafe25e2608f 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile @@ -19,6 +19,6 @@ FROM spark-base # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: -# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile . +# docker build -t spark-init:latest -f dockerfiles/init-container/Dockerfile . ENTRYPOINT [ "/sbin/tini", "--", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkDependencyDownloadInitContainer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile index c9a92fa1c5b62..9ca96be0f1a88 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile @@ -17,8 +17,9 @@ FROM spark-base + # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: -# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile . +# docker build -t spark-resource-staging-server:latest -f dockerfiles/resource-staging-server/Dockerfile . ENTRYPOINT [ "/sbin/tini", "--", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.ResourceStagingServer" ] diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile index 7f4e2aa51b67d..ccb2f1a03d88c 100644 --- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile +++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile @@ -19,7 +19,7 @@ FROM spark-base # If this docker file is being used in the context of building your images from a Spark distribution, the docker build # command should be invoked from the top level directory of the Spark distribution. E.g.: -# docker build -t spark-shuffle:latest -f dockerfiles/shuffle/Dockerfile . +# docker build -t spark-shuffle:latest -f dockerfiles/shuffle-service/Dockerfile . COPY examples /opt/spark/examples diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml index bbf4b02cdaaf9..cd3ccad0a2b22 100644 --- a/resource-managers/kubernetes/integration-tests/pom.xml +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -220,6 +220,108 @@ + + copy-integration-python + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/python + + + ${project.parent.basedir}/python + + ${project.parent.basedir}/python/.egg + ${project.parent.basedir}/python/dist + + + + + + + copy-integration-data + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/data + + + ${project.parent.basedir}/data + true + + + + + + copy-integration-licenses + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/licenses + + + ${project.parent.basedir}/licenses + true + + + + + + copy-integration-examples-jar + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/examples/jars + + + ${project.parent.basedir}/examples/target/scala-2.11/jars + true + + + + + + copy-integration-examples-src + pre-integration-test + + copy-resources + + + ${project.build.directory}/docker/examples/src/main + + + ${project.parent.basedir}/examples/src/main + true + + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.6 + + + create-release-file + pre-integration-test + + run + + + + + + + diff --git a/resource-managers/kubernetes/integration-tests/src/test/python/pi.py b/resource-managers/kubernetes/integration-tests/src/test/python/pi.py new file mode 100755 index 0000000000000..e3f0c4aeef1b7 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/python/pi.py @@ -0,0 +1,46 @@ +from __future__ import print_function +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys +from random import random +from operator import add + +from pyspark.sql import SparkSession + + +if __name__ == "__main__": + """ + Usage: pi [partitions] + """ + spark = SparkSession\ + .builder\ + .appName("PythonPi")\ + .getOrCreate() + + partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2 + n = 100000 * partitions + + def f(_): + x = random() * 2 - 1 + y = random() * 2 - 1 + return 1 if x ** 2 + y ** 2 < 1 else 0 + + count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add) + print("Pi is roughly %f" % (4.0 * count / n)) + + spark.stop() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index e377f285eb9a6..d2082291eba22 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -72,6 +72,34 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { kubernetesTestComponents.deleteNamespace() } + test("Run PySpark Job on file from SUBMITTER") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + launchStagingServer(SSLOptions(), None) + sparkConf + .set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) + .set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) + + runPySparkPiAndVerifyCompletion( + PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION) + } + + test("Run PySpark Job on file from CONTAINER with spark.jar defined") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)) + sparkConf + .set(DRIVER_DOCKER_IMAGE, + System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest")) + .set(EXECUTOR_DOCKER_IMAGE, + System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest")) + + runPySparkPiAndVerifyCompletion( + PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION) + } + test("Simple submission test with the resource staging server.") { assume(testBackend.name == MINIKUBE_TEST_BACKEND) @@ -223,6 +251,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { appResource, SPARK_PI_MAIN_CLASS, "Pi is roughly 3", Array.empty[String]) } + private def runPySparkPiAndVerifyCompletion( + appResource: String): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, PYSPARK_PI_MAIN_CLASS, "Pi is roughly 3", + Array(null, "5")) + } + private def runSparkApplicationAndVerifyCompletion( appResource: String, mainClass: String, @@ -305,11 +340,14 @@ private[spark] object KubernetesSuite { s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}" val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" + s"integration-tests-jars/${HELPER_JAR_FILE.getName}" - val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.SparkPiWithInfiniteWait" + val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner" + val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = + "local:///opt/spark/examples/src/main/python/pi.py" + val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py" val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + ".integrationtest.jobs.FileExistenceTest" val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" + diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala index 4db19478f44bc..e240fcf953f8c 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala @@ -16,21 +16,32 @@ */ package org.apache.spark.deploy.kubernetes.integrationtest.docker +import java.io.File import java.net.URI import java.nio.file.Paths +import scala.collection.JavaConverters._ + import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler} import org.apache.http.client.utils.URIBuilder import org.scalatest.concurrent.{Eventually, PatienceConfiguration} import org.scalatest.time.{Minutes, Seconds, Span} -private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, String]) { +import org.apache.spark.internal.Logging +import org.apache.spark.util.RedirectThread + + + +private[spark] class SparkDockerImageBuilder + (private val dockerEnv: Map[String, String]) extends Logging{ private val DOCKER_BUILD_PATH = Paths.get("target", "docker") // Dockerfile paths must be relative to the build path. private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile" private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile" + private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile" private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile" + private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile" private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile" private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile" private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile" @@ -61,9 +72,25 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, def buildSparkDockerImages(): Unit = { Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() } + // Building Python distribution environment + val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON") + .orElse(sys.env.get("PYSPARK_PYTHON")) + .getOrElse("/usr/bin/python") + val builder = new ProcessBuilder( + Seq(pythonExec, "setup.py", "sdist").asJava) + builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python")) + builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize + val process = builder.start() + new RedirectThread(process.getInputStream, System.out, "redirect output").start() + val exitCode = process.waitFor() + if (exitCode != 0) { + logInfo(s"exitCode: $exitCode") + } buildImage("spark-base", BASE_DOCKER_FILE) buildImage("spark-driver", DRIVER_DOCKER_FILE) + buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE) buildImage("spark-executor", EXECUTOR_DOCKER_FILE) + buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE) buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE) buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE) buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)