diff --git a/README.md b/README.md
index cf6b4fa80242b..cb747225a11d4 100644
--- a/README.md
+++ b/README.md
@@ -24,6 +24,7 @@ We've been asked by an Apache Spark Committer to work outside of the Apache infr
This is a collaborative effort by several folks from different companies who are interested in seeing this feature be successful. Companies active in this project include (alphabetically):
+- Bloomberg
- Google
- Haiwen
- Hyperpilot
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 59ccf3af24ce7..9256a9ddd9960 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -335,8 +335,8 @@ object SparkSubmit {
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
- case (KUBERNETES, CLUSTER) if args.isPython || args.isR =>
- printErrorAndExit("Kubernetes does not currently support python or R applications.")
+ case (KUBERNETES, CLUSTER) if args.isR =>
+ printErrorAndExit("Kubernetes does not currently support R applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
@@ -620,8 +620,14 @@ object SparkSubmit {
if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
- childArgs += args.primaryResource
- childArgs += args.mainClass
+ if (args.isPython) {
+ childArgs += args.primaryResource
+ childArgs += "org.apache.spark.deploy.PythonRunner"
+ childArgs += args.pyFiles
+ } else {
+ childArgs += args.primaryResource
+ childArgs += args.mainClass
+ }
childArgs ++= args.childArgs
}
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 3a50860f826c5..2b4e9a6f96af1 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -180,6 +180,32 @@ The above mechanism using `kubectl proxy` can be used when we have authenticatio
kubernetes-client library does not support. Authentication using X509 Client Certs and OAuth tokens
is currently supported.
+### Running PySpark
+
+Running PySpark on Kubernetes leverages the same spark-submit logic when launching on Yarn and Mesos.
+Python files can be distributed by including, in the conf, `--py-files`
+
+Below is an example submission:
+
+
+```
+ bin/spark-submit \
+ --deploy-mode cluster \
+ --master k8s://http://127.0.0.1:8001 \
+ --kubernetes-namespace default \
+ --conf spark.executor.memory=500m \
+ --conf spark.driver.memory=1G \
+ --conf spark.driver.cores=1 \
+ --conf spark.executor.cores=1 \
+ --conf spark.executor.instances=1 \
+ --conf spark.app.name=spark-pi \
+ --conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
+ --conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
+ --conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
+ --py-files local:///opt/spark/examples/src/main/python/sort.py \
+ local:///opt/spark/examples/src/main/python/pi.py 100
+```
+
## Dynamic Executor Scaling
Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
index f2f1136e54fe4..92f051b2ac298 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala
@@ -67,6 +67,8 @@ package object constants {
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
+ private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
+ private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"
// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala
index 8220127eac449..781ecbd6c5416 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala
@@ -47,11 +47,11 @@ private[spark] class Client(
appName: String,
kubernetesResourceNamePrefix: String,
kubernetesAppId: String,
+ mainAppResource: String,
+ pythonResource: Option[PythonSubmissionResourcesImpl],
mainClass: String,
sparkConf: SparkConf,
appArgs: Array[String],
- sparkJars: Seq[String],
- sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClient: KubernetesClient,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
@@ -82,9 +82,7 @@ private[spark] class Client(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
def run(): Unit = {
- validateNoDuplicateFileNames(sparkJars)
- validateNoDuplicateFileNames(sparkFiles)
-
+ val arguments = (pythonResource map {p => p.arguments}).getOrElse(appArgs)
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
sparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX,
@@ -136,7 +134,7 @@ private[spark] class Client(
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
- .withValue(appArgs.mkString(" "))
+ .withValue(arguments.mkString(" "))
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
@@ -182,10 +180,13 @@ private[spark] class Client(
.map(_.build())
val containerLocalizedFilesResolver = initContainerComponentsProvider
- .provideContainerLocalizedFilesResolver()
+ .provideContainerLocalizedFilesResolver(mainAppResource)
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()
-
+ val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles()
+ val resolvedPrimaryPySparkResource = pythonResource.map {
+ p => p.primaryPySparkResource(containerLocalizedFilesResolver)
+ }.getOrElse("")
val initContainerBundler = initContainerComponentsProvider
.provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()),
resolvedSparkJars ++ resolvedSparkFiles)
@@ -221,7 +222,7 @@ private[spark] class Client(
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
case (confKey, confValue) => s"-D$confKey=$confValue"
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
- val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
+ val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
@@ -233,7 +234,15 @@ private[spark] class Client(
.endEnv()
.endContainer()
.endSpec()
- .build()
+ val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter()
+ val resolvedDriverPod = pythonResource.map {
+ p => p.driverPodWithPySparkEnvs(
+ driverPodFileMounter,
+ resolvedPrimaryPySparkResource,
+ resolvedPySparkFiles.mkString(","),
+ driverContainer.getName,
+ resolvedDriverPodBuilder
+ )}.getOrElse(resolvedDriverPodBuilder.build())
Utils.tryWithResource(
kubernetesClient
.pods()
@@ -271,17 +280,6 @@ private[spark] class Client(
}
}
}
-
- private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
- val fileNamesToUris = allFiles.map { file =>
- (new File(Utils.resolveURI(file).getPath).getName, file)
- }
- fileNamesToUris.groupBy(_._1).foreach {
- case (fileName, urisWithFileName) =>
- require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
- s" file name $fileName is shared by all of these URIs: $urisWithFileName")
- }
- }
}
private[spark] object Client {
@@ -292,22 +290,34 @@ private[spark] object Client {
val appArgs = args.drop(2)
run(sparkConf, mainAppResource, mainClass, appArgs)
}
-
def run(
sparkConf: SparkConf,
mainAppResource: String,
mainClass: String,
appArgs: Array[String]): Unit = {
+ val isPython = mainAppResource.endsWith(".py")
+ val pythonResource: Option[PythonSubmissionResourcesImpl] =
+ if (isPython) {
+ Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs))
+ } else None
+ // Since you might need jars for SQL UDFs in PySpark
+ def sparkJarFilter(): Seq[String] =
+ pythonResource.map {p => p.sparkJars}.getOrElse(
+ Option(mainAppResource)
+ .filterNot(_ == SparkLauncher.NO_RESOURCE)
+ .toSeq)
val sparkJars = sparkConf.getOption("spark.jars")
.map(_.split(","))
- .getOrElse(Array.empty[String]) ++
- Option(mainAppResource)
- .filterNot(_ == SparkLauncher.NO_RESOURCE)
- .toSeq
+ .getOrElse(Array.empty[String]) ++ sparkJarFilter()
val launchTime = System.currentTimeMillis
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
+ val pySparkFilesOption = pythonResource.map {p => p.pySparkFiles}
+ validateNoDuplicateFileNames(sparkJars)
+ validateNoDuplicateFileNames(sparkFiles)
+ pySparkFilesOption.foreach {b => validateNoDuplicateFileNames(b)}
+ val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String])
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The resource name prefix is derived from the application name, making it easy to connect the
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
@@ -326,6 +336,7 @@ private[spark] object Client {
namespace,
sparkJars,
sparkFiles,
+ pySparkFiles,
sslOptionsProvider.getSslOptions)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
@@ -346,11 +357,11 @@ private[spark] object Client {
appName,
kubernetesResourceNamePrefix,
kubernetesAppId,
+ mainAppResource,
+ pythonResource,
mainClass,
sparkConf,
appArgs,
- sparkJars,
- sparkFiles,
waitForAppCompletion,
kubernetesClient,
initContainerComponentsProvider,
@@ -358,4 +369,14 @@ private[spark] object Client {
loggingPodStatusWatcher).run()
}
}
+ private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
+ val fileNamesToUris = allFiles.map { file =>
+ (new File(Utils.resolveURI(file).getPath).getName, file)
+ }
+ fileNamesToUris.groupBy(_._1).foreach {
+ case (fileName, urisWithFileName) =>
+ require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
+ s" file name $fileName is shared by all of these URIs: $urisWithFileName")
+ }
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala
index c635484c4c124..c31aa5f306bea 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolver.scala
@@ -24,14 +24,19 @@ private[spark] trait ContainerLocalizedFilesResolver {
def resolveSubmittedAndRemoteSparkJars(): Seq[String]
def resolveSubmittedSparkJars(): Seq[String]
def resolveSubmittedSparkFiles(): Seq[String]
+ def resolveSubmittedPySparkFiles(): Seq[String]
+ def resolvePrimaryResourceFile(): String
}
private[spark] class ContainerLocalizedFilesResolverImpl(
sparkJars: Seq[String],
sparkFiles: Seq[String],
+ pySparkFiles: Seq[String],
+ primaryPyFile: String,
jarsDownloadPath: String,
filesDownloadPath: String) extends ContainerLocalizedFilesResolver {
+
override def resolveSubmittedAndRemoteSparkJars(): Seq[String] = {
sparkJars.map { jar =>
val jarUri = Utils.resolveURI(jar)
@@ -53,16 +58,30 @@ private[spark] class ContainerLocalizedFilesResolverImpl(
resolveSubmittedFiles(sparkFiles, filesDownloadPath)
}
- private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
- files.map { file =>
- val fileUri = Utils.resolveURI(file)
- Option(fileUri.getScheme).getOrElse("file") match {
- case "file" =>
- val fileName = new File(fileUri.getPath).getName
- s"$downloadPath/$fileName"
- case _ =>
- file
- }
+ override def resolveSubmittedPySparkFiles(): Seq[String] = {
+ def filterMainResource(x: String) = x match {
+ case `primaryPyFile` => None
+ case _ => Some(resolveFile(x, filesDownloadPath))
+ }
+ pySparkFiles.flatMap(x => filterMainResource(x))
+ }
+
+ override def resolvePrimaryResourceFile(): String = {
+ Option(primaryPyFile).map(p => resolveFile(p, filesDownloadPath)).getOrElse("")
+ }
+
+ private def resolveFile(file: String, downloadPath: String) = {
+ val fileUri = Utils.resolveURI(file)
+ Option(fileUri.getScheme).getOrElse("file") match {
+ case "file" =>
+ val fileName = new File(fileUri.getPath).getName
+ s"$downloadPath/$fileName"
+ case _ =>
+ file
}
}
+
+ private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
+ files.map { file => resolveFile(file, downloadPath) }
+ }
}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala
index cc1837cce6736..6e185d2c069f6 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala
@@ -32,13 +32,15 @@ import org.apache.spark.util.Utils
*/
private[spark] trait DriverInitContainerComponentsProvider {
- def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver
+ def provideContainerLocalizedFilesResolver(
+ mainAppResource: String): ContainerLocalizedFilesResolver
def provideInitContainerSubmittedDependencyUploader(
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader]
def provideSubmittedDependenciesSecretBuilder(
maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets])
: Option[SubmittedDependencySecretBuilder]
def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap
+ def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter
def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle]
}
@@ -49,6 +51,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
namespace: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
+ pySparkFiles: Seq[String],
resourceStagingServerExternalSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {
@@ -104,6 +107,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
+ private val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles)
private def provideInitContainerConfigMap(
maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = {
@@ -130,7 +134,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
}
new SparkInitContainerConfigMapBuilderImpl(
sparkJars,
- sparkFiles,
+ sparkFiles ++ pySparkSubmitted,
jarsDownloadPath,
filesDownloadPath,
configMapName,
@@ -138,9 +142,10 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
submittedDependencyConfigPlugin).build()
}
- override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
+ override def provideContainerLocalizedFilesResolver(mainAppResource: String)
+ : ContainerLocalizedFilesResolver = {
new ContainerLocalizedFilesResolverImpl(
- sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath)
+ sparkJars, sparkFiles, pySparkFiles, mainAppResource, jarsDownloadPath, filesDownloadPath)
}
private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
@@ -159,7 +164,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
namespace,
stagingServerUri,
sparkJars,
- sparkFiles,
+ sparkFiles ++ pySparkSubmitted,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
}
@@ -201,13 +206,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
configMapKey,
resourceStagingServerSecretPlugin)
}
-
+ override def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter = {
+ new DriverPodKubernetesFileMounterImpl()
+ }
override def provideInitContainerBundle(
maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle] = {
- val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
- // Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
- if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
+ // Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes'
+ // is empty or only has `local://` URIs
+ if ((KubernetesFileUtils.getNonContainerLocalFiles(uris) ++ pySparkSubmitted).nonEmpty) {
Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds),
provideInitContainerBootstrap(),
provideExecutorInitContainerConfiguration()))
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala
new file mode 100644
index 0000000000000..cc0ef0eedb457
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverPodKubernetesFileMounter.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit
+
+import io.fabric8.kubernetes.api.model.{Container, PodBuilder}
+
+import org.apache.spark.deploy.kubernetes.constants._
+
+ /**
+ * Trait that is responsible for providing full file-paths dynamically after
+ * the filesDownloadPath has been defined. The file-names are then stored in the
+ * environmental variables in the driver-pod.
+ */
+private[spark] trait DriverPodKubernetesFileMounter {
+ def addPySparkFiles(primaryFile: String, pySparkFiles: String,
+ mainContainerName: String, originalPodSpec: PodBuilder) : PodBuilder
+}
+
+private[spark] class DriverPodKubernetesFileMounterImpl()
+ extends DriverPodKubernetesFileMounter {
+ override def addPySparkFiles(
+ primaryFile: String,
+ pySparkFiles: String,
+ mainContainerName: String,
+ originalPodSpec: PodBuilder): PodBuilder = {
+
+ originalPodSpec
+ .editSpec()
+ .editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
+ .addNewEnv()
+ .withName(ENV_PYSPARK_PRIMARY)
+ .withValue(primaryFile)
+ .endEnv()
+ .addNewEnv()
+ .withName(ENV_PYSPARK_FILES)
+ .withValue(pySparkFiles)
+ .endEnv()
+ .endContainer()
+ .endSpec()
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala
new file mode 100644
index 0000000000000..c61e930a2b97f
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResources.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit
+
+import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
+
+private[spark] trait PythonSubmissionResources {
+ def sparkJars: Seq[String]
+ def pySparkFiles: Array[String]
+ def arguments: Array[String]
+ def primaryPySparkResource(containerLocalizedFilesResolver: ContainerLocalizedFilesResolver)
+ : String
+ def driverPodWithPySparkEnvs(
+ driverPodFileMounter: DriverPodKubernetesFileMounter,
+ resolvedPrimaryPySparkResource: String,
+ resolvedPySparkFiles: String,
+ driverContainerName: String,
+ driverPodBuilder: PodBuilder): Pod
+}
+
+private[spark] class PythonSubmissionResourcesImpl(
+ private val mainAppResource: String,
+ private val appArgs: Array[String] ) extends PythonSubmissionResources {
+
+ private val pyFiles: Array[String] = {
+ Option(appArgs(0)).map(a => mainAppResource +: a.split(","))
+ .getOrElse(Array(mainAppResource))
+ }
+
+ override def sparkJars: Seq[String] = Seq.empty[String]
+
+ override def pySparkFiles: Array[String] = pyFiles
+
+ override def arguments: Array[String] = {
+ pyFiles.toList match {
+ case Nil => appArgs
+ case a :: b => a match {
+ case _ if a == mainAppResource && b == Nil => appArgs
+ case _ => appArgs.drop(1)
+ }
+ }
+ }
+ override def primaryPySparkResource(
+ containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) : String =
+ containerLocalizedFilesResolver.resolvePrimaryResourceFile()
+
+ override def driverPodWithPySparkEnvs(
+ driverPodFileMounter: DriverPodKubernetesFileMounter,
+ resolvedPrimaryPySparkResource: String,
+ resolvedPySparkFiles: String,
+ driverContainerName: String,
+ driverPodBuilder: PodBuilder) : Pod = {
+ driverPodFileMounter
+ .addPySparkFiles(
+ resolvedPrimaryPySparkResource,
+ resolvedPySparkFiles,
+ driverContainerName,
+ driverPodBuilder)
+ .build()
+ }
+}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala
index 8992a56e20c80..a58a37691f4eb 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala
@@ -18,7 +18,7 @@ package org.apache.spark.deploy.kubernetes.submit
import java.io.File
-import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapBuilder, DoneablePod, HasMetadata, Pod, PodBuilder, PodList, Secret, SecretBuilder}
+import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.dsl.{MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
import org.hamcrest.{BaseMatcher, Description}
@@ -27,10 +27,10 @@ import org.mockito.Matchers.{any, anyVararg, argThat, eq => mockitoEq}
import org.mockito.Mockito.{times, verify, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
-import org.scalatest.BeforeAndAfter
+import org.scalatest.{BeforeAndAfter, Matchers}
+
import scala.collection.JavaConverters._
import scala.collection.mutable
-
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.kubernetes.{KubernetesExternalShuffleService, KubernetesShuffleBlockHandler, SparkPodInitContainerBootstrap}
import org.apache.spark.deploy.kubernetes.config._
@@ -63,6 +63,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
private val INIT_CONTAINER_SECRET_NAME = "init-container-secret"
private val INIT_CONTAINER_SECRET_DATA = Map("secret-key" -> "secret-data")
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
+ private val PYSPARK_APP_ARGS = Array(null, "500")
private val APP_ARGS = Array("3", "20")
private val SPARK_JARS = Seq(
"hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar")
@@ -72,6 +73,20 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
"/var/data/spark-jars/jar1.jar", "/var/data/spark-jars/jar2.jar")
private val SPARK_FILES = Seq(
"hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt")
+ private val PYSPARK_FILES = Seq(
+ "hdfs://localhost:9000/app/files/file1.py",
+ "file:///app/files/file2.py",
+ "local:///app/files/file3.py",
+ "http://app/files/file4.py",
+ "file:///app/files/file5.py")
+ private val RESOLVED_PYSPARK_FILES = Seq(
+ "hdfs://localhost:9000/app/files/file1.py",
+ "/var/spark-data/spark-files/file2.py",
+ "local:///app/files/file3.py",
+ "http://app/files/file4.py")
+ private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py"
+ private val RESOLVED_PYSPARK_PRIMARY_FILE = "/var/spark-data/spark-file/file5.py"
+
private val RESOLVED_SPARK_FILES = Seq(
"hdfs://localhost:9000/app/files/file1.txt", "file:///var/data/spark-files/file2.txt")
private val INIT_CONTAINER_SECRET = new SecretBuilder()
@@ -138,7 +153,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var kubernetesClient: KubernetesClient = _
@Mock
- private var podOps: MixedOperation[Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] = _
+ private var podOps: MixedOperation[
+ Pod, PodList, DoneablePod, PodResource[Pod, DoneablePod]] = _
private type ResourceListOps = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
HasMetadata, java.lang.Boolean]
@Mock
@@ -146,6 +162,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
@Mock
private var credentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider = _
@Mock
+ private var fileMounter: DriverPodKubernetesFileMounter = _
+ @Mock
private var credentialsMounter: DriverPodKubernetesCredentialsMounter = _
@Mock
private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
@@ -169,8 +187,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
.endMetadata()
}
})
- when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver())
- .thenReturn(containerLocalizedFilesResolver)
+ when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver(
+ any[String])).thenReturn(containerLocalizedFilesResolver)
+ when(initContainerComponentsProvider.provideDriverPodFileMounter())
+ .thenReturn(fileMounter)
when(submittedDependenciesSecretBuilder.build())
.thenReturn(INIT_CONTAINER_SECRET)
when(kubernetesClient.pods()).thenReturn(podOps)
@@ -178,14 +198,30 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
override def answer(invocation: InvocationOnMock): Pod = {
new PodBuilder(invocation.getArgumentAt(0, classOf[Pod]))
.editMetadata()
- .withUid(DRIVER_POD_UID)
+ .withUid(DRIVER_POD_UID)
.endMetadata()
- .withKind(DRIVER_POD_KIND)
+ .withKind(DRIVER_POD_KIND)
.withApiVersion(DRIVER_POD_API_VERSION)
.build()
}
})
when(podOps.withName(s"$APP_RESOURCE_PREFIX-driver")).thenReturn(namedPodResource)
+ when(fileMounter.addPySparkFiles(
+ mockitoEq(RESOLVED_PYSPARK_PRIMARY_FILE),
+ mockitoEq(RESOLVED_PYSPARK_FILES.mkString(",")),
+ any[String],
+ any())).thenAnswer( new Answer[PodBuilder] {
+ override def answer(invocation: InvocationOnMock) : PodBuilder = {
+ invocation.getArgumentAt(3, classOf[PodBuilder])
+ .editMetadata()
+ .withUid(DRIVER_POD_UID)
+ .withName(s"$APP_RESOURCE_PREFIX-driver")
+ .addToLabels("pyspark-test", "true")
+ .endMetadata()
+ .withKind(DRIVER_POD_KIND)
+ .withApiVersion(DRIVER_POD_API_VERSION)
+ }
+ })
when(namedPodResource.watch(loggingPodStatusWatcher)).thenReturn(watch)
when(containerLocalizedFilesResolver.resolveSubmittedAndRemoteSparkJars())
.thenReturn(RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS)
@@ -193,6 +229,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
.thenReturn(RESOLVED_SPARK_JARS)
when(containerLocalizedFilesResolver.resolveSubmittedSparkFiles())
.thenReturn(RESOLVED_SPARK_FILES)
+ when(containerLocalizedFilesResolver.resolvePrimaryResourceFile())
+ .thenReturn(RESOLVED_PYSPARK_PRIMARY_FILE)
+ when(containerLocalizedFilesResolver.resolveSubmittedPySparkFiles())
+ .thenReturn(RESOLVED_PYSPARK_FILES)
when(executorInitContainerConfiguration.configureSparkConfForExecutorInitContainer(SPARK_CONF))
.thenReturn(SPARK_CONF_WITH_EXECUTOR_INIT_CONF)
when(kubernetesClient.resourceList(anyVararg[HasMetadata]())).thenReturn(resourceListOps)
@@ -208,9 +248,10 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
when(initContainerComponentsProvider
.provideSubmittedDependenciesSecretBuilder(Some(SUBMITTED_RESOURCES.secrets())))
.thenReturn(Some(submittedDependenciesSecretBuilder))
- when(initContainerComponentsProvider.provideInitContainerBundle(Some(SUBMITTED_RESOURCES.ids()),
- RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES))
- .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
+ when(initContainerComponentsProvider.provideInitContainerBundle(mockitoEq(
+ Option(SUBMITTED_RESOURCES.ids())),
+ mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES)))
+ .thenReturn(Option(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
initContainerBootstrap, executorInitContainerConfiguration)))
runAndVerifyDriverPodHasCorrectProperties()
val resourceListArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
@@ -292,11 +333,11 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
APP_NAME,
APP_RESOURCE_PREFIX,
APP_ID,
+ "",
+ None,
MAIN_CLASS,
SPARK_CONF,
APP_ARGS,
- SPARK_JARS,
- SPARK_FILES,
true,
kubernetesClient,
initContainerComponentsProvider,
@@ -305,6 +346,20 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
verify(loggingPodStatusWatcher).awaitCompletion()
}
+ test("Mounting environmental variables correctly onto Driver Pod for PySpark Jobs") {
+ expectationsForNoMountedCredentials()
+ expectationsForNoDependencyUploader()
+ expectationsForNoSparkJarsOrFiles()
+ runAndVerifyDriverPodHasCorrectPySparkProperties()
+ }
+
+ private def expectationsForNoSparkJarsOrFiles(): Unit = {
+ when(containerLocalizedFilesResolver.resolveSubmittedSparkFiles())
+ .thenReturn(Nil)
+ when(containerLocalizedFilesResolver.resolveSubmittedSparkJars())
+ .thenReturn(Nil)
+ }
+
private def expectationsForNoDependencyUploader(): Unit = {
when(initContainerComponentsProvider
.provideInitContainerSubmittedDependencyUploader(ALL_EXPECTED_LABELS))
@@ -312,8 +367,8 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
when(initContainerComponentsProvider
.provideSubmittedDependenciesSecretBuilder(None))
.thenReturn(None)
- when(initContainerComponentsProvider.provideInitContainerBundle(None, RESOLVED_SPARK_JARS ++
- RESOLVED_SPARK_FILES))
+ when(initContainerComponentsProvider.provideInitContainerBundle(mockitoEq(None),
+ mockitoEq(RESOLVED_SPARK_JARS ++ RESOLVED_SPARK_FILES)))
.thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
initContainerBootstrap, executorInitContainerConfiguration)))
}
@@ -373,16 +428,28 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
}
}
+ private def runAndVerifyDriverPodHasCorrectPySparkProperties(): Unit = {
+ when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver(
+ mockitoEq(PYSPARK_PRIMARY_FILE))).thenReturn(containerLocalizedFilesResolver)
+ when(initContainerComponentsProvider.provideInitContainerBundle(
+ any[Option[SubmittedResourceIds]], any[Iterable[String]]))
+ .thenReturn(Some(InitContainerBundle(INIT_CONTAINER_CONFIG_MAP,
+ initContainerBootstrap, executorInitContainerConfiguration)))
+ runAndVerifyPySparkPodMatchesPredicate { p =>
+ Option(p).exists(pod => containerHasCorrectPySparkEnvs(pod))
+ }
+ }
+
private def runAndVerifyPodMatchesPredicate(pred: (Pod => Boolean)): Unit = {
new Client(
APP_NAME,
APP_RESOURCE_PREFIX,
APP_ID,
+ "",
+ None,
MAIN_CLASS,
SPARK_CONF,
APP_ARGS,
- SPARK_JARS,
- SPARK_FILES,
false,
kubernetesClient,
initContainerComponentsProvider,
@@ -434,6 +501,15 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
expectedBasicEnvs.toSet.subsetOf(envs.toSet)
}
+ private def containerHasCorrectPySparkEnvs(pod: Pod): Boolean = {
+ val driverPodLabels =
+ pod.getMetadata.getLabels.asScala.map(env => (env._1.toString, env._2.toString))
+ val expectedBasicLabels = Map(
+ "pyspark-test" -> "true",
+ "spark-role" -> "driver")
+ expectedBasicLabels.toSet.subsetOf(driverPodLabels.toSet)
+ }
+
private def containerHasCorrectBasicContainerConfiguration(pod: Pod): Boolean = {
val containers = pod.getSpec.getContainers.asScala
containers.size == 1 &&
@@ -450,4 +526,33 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
BOOTSTRAPPED_POD_ANNOTATION -> TRUE)
pod.getMetadata.getAnnotations.asScala == expectedAnnotations
}
+
+ private def runAndVerifyPySparkPodMatchesPredicate(pred: (Pod => Boolean)): Unit = {
+ new Client(
+ APP_NAME,
+ APP_RESOURCE_PREFIX,
+ APP_ID,
+ PYSPARK_PRIMARY_FILE,
+ Option(new PythonSubmissionResourcesImpl(PYSPARK_PRIMARY_FILE, PYSPARK_APP_ARGS)),
+ MAIN_CLASS,
+ SPARK_CONF,
+ PYSPARK_APP_ARGS,
+ false,
+ kubernetesClient,
+ initContainerComponentsProvider,
+ credentialsMounterProvider,
+ loggingPodStatusWatcher).run()
+ val podMatcher = new BaseMatcher[Pod] {
+ override def matches(o: scala.Any): Boolean = {
+ o match {
+ case p: Pod => pred(p)
+ case _ => false
+ }
+ }
+ override def describeTo(description: Description): Unit = {}
+ }
+ verify(podOps).create(argThat(podMatcher))
+ }
}
+
+
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala
index ca5cd1fff9b74..7e51abcd7b8e0 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ContainerLocalizedFilesResolverSuite.scala
@@ -29,11 +29,20 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite {
"file:///app/files/file2.txt",
"local:///app/files/file3.txt",
"http://app/files/file4.txt")
+ private val PYSPARK_FILES = Seq(
+ "hdfs://localhost:9000/app/files/file1.py",
+ "file:///app/files/file2.py",
+ "local:///app/files/file3.py",
+ "http://app/files/file4.py",
+ "file:///app/files/file5.py")
private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars"
private val FILES_DOWNLOAD_PATH = "/var/data/spark-files"
+ private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py"
private val localizedFilesResolver = new ContainerLocalizedFilesResolverImpl(
SPARK_JARS,
SPARK_FILES,
+ PYSPARK_FILES,
+ PYSPARK_PRIMARY_FILE,
JARS_DOWNLOAD_PATH,
FILES_DOWNLOAD_PATH)
@@ -66,4 +75,19 @@ class ContainerLocalizedFilesResolverSuite extends SparkFunSuite {
"http://app/files/file4.txt")
assert(resolvedFiles === expectedResolvedFiles)
}
+ test("Submitted PySpark files should resolve to the download path.") {
+ val resolvedPySparkFiles = localizedFilesResolver.resolveSubmittedPySparkFiles()
+ val expectedPySparkFiles = Seq(
+ "hdfs://localhost:9000/app/files/file1.py",
+ s"$FILES_DOWNLOAD_PATH/file2.py",
+ "local:///app/files/file3.py",
+ "http://app/files/file4.py")
+ assert(resolvedPySparkFiles === expectedPySparkFiles)
+ }
+ test("Submitted PySpark Primary resource should resolve to the download path.") {
+ val resolvedPySparkPrimary =
+ localizedFilesResolver.resolvePrimaryResourceFile()
+ val expectedPySparkPrimary = s"$FILES_DOWNLOAD_PATH/file5.py"
+ assert(resolvedPySparkPrimary === expectedPySparkPrimary)
+ }
}
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala
new file mode 100644
index 0000000000000..9b60b7ef2b786
--- /dev/null
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/PythonSubmissionResourcesSuite.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.kubernetes.submit
+
+import org.apache.spark.{SSLOptions, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.kubernetes.config._
+
+import scala.collection.JavaConverters._
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, Pod, PodBuilder}
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+
+private[spark] class PythonSubmissionResourcesSuite extends SparkFunSuite with BeforeAndAfter {
+ private val PYSPARK_FILES = Seq(
+ "hdfs://localhost:9000/app/files/file1.py",
+ "file:///app/files/file2.py",
+ "local:///app/files/file3.py",
+ "http://app/files/file4.py",
+ "file:///app/files/file5.py")
+ private val RESOLVED_PYSPARK_FILES = Seq(
+ "hdfs://localhost:9000/app/files/file1.py",
+ "/var/spark-data/spark-files/file2.py",
+ "local:///app/file`s/file3.py",
+ "http://app/files/file4.py")
+ private val PYSPARK_PRIMARY_FILE = "file:///app/files/file5.py"
+ private val RESOLVED_PYSPARK_PRIMARY_FILE = "/var/data/spark-files/file5.py"
+
+ private val pyFilesResource = new PythonSubmissionResourcesImpl(
+ PYSPARK_PRIMARY_FILE, Array(PYSPARK_FILES.mkString(","), "500")
+ )
+ private val pyResource = new PythonSubmissionResourcesImpl(
+ PYSPARK_PRIMARY_FILE, Array(null, "500")
+ )
+ private val DRIVER_CONTAINER_NAME = "pyspark_container"
+ private val driverContainer = new ContainerBuilder()
+ .withName(DRIVER_CONTAINER_NAME)
+ .build()
+ private val basePodBuilder = new PodBuilder()
+ .withNewMetadata()
+ .withName("base_pod")
+ .endMetadata()
+ .withNewSpec()
+ .addToContainers(driverContainer)
+ .endSpec()
+
+ @Mock
+ private var driverInitContainer: DriverInitContainerComponentsProviderImpl = _
+ @Mock
+ private var localizedFileResolver: ContainerLocalizedFilesResolverImpl = _
+ before {
+ MockitoAnnotations.initMocks(this)
+ when(driverInitContainer.provideDriverPodFileMounter()).thenReturn(
+ new DriverPodKubernetesFileMounterImpl()
+ )
+ when(localizedFileResolver.resolvePrimaryResourceFile()).thenReturn(
+ RESOLVED_PYSPARK_PRIMARY_FILE)
+ }
+ test("Test with --py-files included") {
+ assert(pyFilesResource.sparkJars === Seq.empty[String])
+ assert(pyFilesResource.pySparkFiles ===
+ PYSPARK_PRIMARY_FILE +: PYSPARK_FILES)
+ assert(pyFilesResource.primaryPySparkResource(localizedFileResolver) ===
+ RESOLVED_PYSPARK_PRIMARY_FILE)
+ val driverPod: Pod = pyFilesResource.driverPodWithPySparkEnvs(
+ driverInitContainer.provideDriverPodFileMounter(),
+ RESOLVED_PYSPARK_PRIMARY_FILE,
+ RESOLVED_PYSPARK_FILES.mkString(","),
+ DRIVER_CONTAINER_NAME,
+ basePodBuilder
+ )
+ val driverContainer = driverPod.getSpec.getContainers.asScala.head
+ val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)).toMap
+ envs.get("PYSPARK_PRIMARY") foreach{ a => assert (a === RESOLVED_PYSPARK_PRIMARY_FILE) }
+ envs.get("PYSPARK_FILES") foreach{ a => assert (a === RESOLVED_PYSPARK_FILES.mkString(",")) }
+ }
+
+ test("Test without --py-files") {
+ assert(pyResource.sparkJars === Seq.empty[String])
+ assert(pyResource.pySparkFiles === Array(PYSPARK_PRIMARY_FILE))
+ assert(pyResource.primaryPySparkResource(localizedFileResolver) ===
+ RESOLVED_PYSPARK_PRIMARY_FILE)
+ val driverPod: Pod = pyResource.driverPodWithPySparkEnvs(
+ driverInitContainer.provideDriverPodFileMounter(),
+ RESOLVED_PYSPARK_PRIMARY_FILE,
+ "",
+ DRIVER_CONTAINER_NAME,
+ basePodBuilder
+ )
+ val driverContainer = driverPod.getSpec.getContainers.asScala.head
+ val envs = driverContainer.getEnv.asScala.map(env => (env.getName, env.getValue)).toMap
+ envs.get("PYSPARK_PRIMARY") foreach{ a => assert (a === RESOLVED_PYSPARK_PRIMARY_FILE) }
+ envs.get("PYSPARK_FILES") foreach{ a => assert (a === "") }
+ }
+}
\ No newline at end of file
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile
new file mode 100644
index 0000000000000..6dcc7511c0dd9
--- /dev/null
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/driver-py/Dockerfile
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark distribution. E.g.:
+# docker build -t spark-driver-py:latest -f dockerfiles/driver-py/Dockerfile .
+
+ADD examples /opt/spark/examples
+ADD python /opt/spark/python
+
+RUN apk add --no-cache python && \
+ python -m ensurepip && \
+ rm -r /usr/lib/python*/ensurepip && \
+ pip install --upgrade pip setuptools && \
+ rm -r /root/.cache
+# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES
+# RUN apk add --update alpine-sdk python-dev
+# RUN pip install numpy
+
+ENV PYTHON_VERSION 2.7.13
+ENV PYSPARK_PYTHON python
+ENV PYSPARK_DRIVER_PYTHON python
+ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+ if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
+ exec /sbin/tini -- ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH \
+ -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY \
+ $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile
new file mode 100644
index 0000000000000..7a65a4f879376
--- /dev/null
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/executor-py/Dockerfile
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark distribution. E.g.:
+# docker build -t spark-executor-py:latest -f dockerfiles/executor-py/Dockerfile .
+
+ADD examples /opt/spark/examples
+ADD python /opt/spark/python
+
+RUN apk add --no-cache python && \
+ python -m ensurepip && \
+ rm -r /usr/lib/python*/ensurepip && \
+ pip install --upgrade pip setuptools && \
+ rm -r /root/.cache
+# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES
+# RUN apk add --update alpine-sdk python-dev
+# RUN pip install numpy
+
+ENV PYTHON_VERSION 2.7.13
+ENV PYSPARK_PYTHON python
+ENV PYSPARK_DRIVER_PYTHON python
+ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
+
+# TODO support spark.executor.extraClassPath
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+ if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
+ exec /sbin/tini -- ${JAVA_HOME}/bin/java -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
\ No newline at end of file
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile
index 6bff06da12840..4bafe25e2608f 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/init-container/Dockerfile
@@ -19,6 +19,6 @@ FROM spark-base
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
-# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile .
+# docker build -t spark-init:latest -f dockerfiles/init-container/Dockerfile .
ENTRYPOINT [ "/sbin/tini", "--", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.KubernetesSparkDependencyDownloadInitContainer" ]
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile
index c9a92fa1c5b62..9ca96be0f1a88 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/resource-staging-server/Dockerfile
@@ -17,8 +17,9 @@
FROM spark-base
+
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
-# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile .
+# docker build -t spark-resource-staging-server:latest -f dockerfiles/resource-staging-server/Dockerfile .
ENTRYPOINT [ "/sbin/tini", "--", "bin/spark-class", "org.apache.spark.deploy.rest.kubernetes.ResourceStagingServer" ]
diff --git a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile
index 7f4e2aa51b67d..ccb2f1a03d88c 100644
--- a/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile
+++ b/resource-managers/kubernetes/docker-minimal-bundle/src/main/docker/shuffle-service/Dockerfile
@@ -19,7 +19,7 @@ FROM spark-base
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build
# command should be invoked from the top level directory of the Spark distribution. E.g.:
-# docker build -t spark-shuffle:latest -f dockerfiles/shuffle/Dockerfile .
+# docker build -t spark-shuffle:latest -f dockerfiles/shuffle-service/Dockerfile .
COPY examples /opt/spark/examples
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index bbf4b02cdaaf9..cd3ccad0a2b22 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -220,6 +220,108 @@
+
+ copy-integration-python
+ pre-integration-test
+
+ copy-resources
+
+
+ ${project.build.directory}/docker/python
+
+
+ ${project.parent.basedir}/python
+
+ ${project.parent.basedir}/python/.egg
+ ${project.parent.basedir}/python/dist
+
+
+
+
+
+
+ copy-integration-data
+ pre-integration-test
+
+ copy-resources
+
+
+ ${project.build.directory}/docker/data
+
+
+ ${project.parent.basedir}/data
+ true
+
+
+
+
+
+ copy-integration-licenses
+ pre-integration-test
+
+ copy-resources
+
+
+ ${project.build.directory}/docker/licenses
+
+
+ ${project.parent.basedir}/licenses
+ true
+
+
+
+
+
+ copy-integration-examples-jar
+ pre-integration-test
+
+ copy-resources
+
+
+ ${project.build.directory}/docker/examples/jars
+
+
+ ${project.parent.basedir}/examples/target/scala-2.11/jars
+ true
+
+
+
+
+
+ copy-integration-examples-src
+ pre-integration-test
+
+ copy-resources
+
+
+ ${project.build.directory}/docker/examples/src/main
+
+
+ ${project.parent.basedir}/examples/src/main
+ true
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-antrun-plugin
+ 1.6
+
+
+ create-release-file
+ pre-integration-test
+
+ run
+
+
+
+
+
+
+
diff --git a/resource-managers/kubernetes/integration-tests/src/test/python/pi.py b/resource-managers/kubernetes/integration-tests/src/test/python/pi.py
new file mode 100755
index 0000000000000..e3f0c4aeef1b7
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/python/pi.py
@@ -0,0 +1,46 @@
+from __future__ import print_function
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import sys
+from random import random
+from operator import add
+
+from pyspark.sql import SparkSession
+
+
+if __name__ == "__main__":
+ """
+ Usage: pi [partitions]
+ """
+ spark = SparkSession\
+ .builder\
+ .appName("PythonPi")\
+ .getOrCreate()
+
+ partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
+ n = 100000 * partitions
+
+ def f(_):
+ x = random() * 2 - 1
+ y = random() * 2 - 1
+ return 1 if x ** 2 + y ** 2 < 1 else 0
+
+ count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
+ print("Pi is roughly %f" % (4.0 * count / n))
+
+ spark.stop()
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
index e377f285eb9a6..d2082291eba22 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala
@@ -72,6 +72,34 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesTestComponents.deleteNamespace()
}
+ test("Run PySpark Job on file from SUBMITTER") {
+ assume(testBackend.name == MINIKUBE_TEST_BACKEND)
+
+ launchStagingServer(SSLOptions(), None)
+ sparkConf
+ .set(DRIVER_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest"))
+ .set(EXECUTOR_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))
+
+ runPySparkPiAndVerifyCompletion(
+ PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION)
+ }
+
+ test("Run PySpark Job on file from CONTAINER with spark.jar defined") {
+ assume(testBackend.name == MINIKUBE_TEST_BACKEND)
+
+ sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
+ sparkConf
+ .set(DRIVER_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest"))
+ .set(EXECUTOR_DOCKER_IMAGE,
+ System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))
+
+ runPySparkPiAndVerifyCompletion(
+ PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION)
+ }
+
test("Simple submission test with the resource staging server.") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
@@ -223,6 +251,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
appResource, SPARK_PI_MAIN_CLASS, "Pi is roughly 3", Array.empty[String])
}
+ private def runPySparkPiAndVerifyCompletion(
+ appResource: String): Unit = {
+ runSparkApplicationAndVerifyCompletion(
+ appResource, PYSPARK_PI_MAIN_CLASS, "Pi is roughly 3",
+ Array(null, "5"))
+ }
+
private def runSparkApplicationAndVerifyCompletion(
appResource: String,
mainClass: String,
@@ -305,11 +340,14 @@ private[spark] object KubernetesSuite {
s"integration-tests-jars/${EXAMPLES_JAR_FILE.getName}"
val CONTAINER_LOCAL_HELPER_JAR_PATH = s"local:///opt/spark/examples/" +
s"integration-tests-jars/${HELPER_JAR_FILE.getName}"
-
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
".integrationtest.jobs.SparkPiWithInfiniteWait"
+ val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
+ val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION =
+ "local:///opt/spark/examples/src/main/python/pi.py"
+ val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py"
val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
".integrationtest.jobs.FileExistenceTest"
val GROUP_BY_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala
index 4db19478f44bc..e240fcf953f8c 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/docker/SparkDockerImageBuilder.scala
@@ -16,21 +16,32 @@
*/
package org.apache.spark.deploy.kubernetes.integrationtest.docker
+import java.io.File
import java.net.URI
import java.nio.file.Paths
+import scala.collection.JavaConverters._
+
import com.spotify.docker.client.{DefaultDockerClient, DockerCertificates, LoggingBuildHandler}
import org.apache.http.client.utils.URIBuilder
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Seconds, Span}
-private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String, String]) {
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.RedirectThread
+
+
+
+private[spark] class SparkDockerImageBuilder
+ (private val dockerEnv: Map[String, String]) extends Logging{
private val DOCKER_BUILD_PATH = Paths.get("target", "docker")
// Dockerfile paths must be relative to the build path.
private val BASE_DOCKER_FILE = "dockerfiles/spark-base/Dockerfile"
private val DRIVER_DOCKER_FILE = "dockerfiles/driver/Dockerfile"
+ private val DRIVERPY_DOCKER_FILE = "dockerfiles/driver-py/Dockerfile"
private val EXECUTOR_DOCKER_FILE = "dockerfiles/executor/Dockerfile"
+ private val EXECUTORPY_DOCKER_FILE = "dockerfiles/executor-py/Dockerfile"
private val SHUFFLE_SERVICE_DOCKER_FILE = "dockerfiles/shuffle-service/Dockerfile"
private val INIT_CONTAINER_DOCKER_FILE = "dockerfiles/init-container/Dockerfile"
private val STAGING_SERVER_DOCKER_FILE = "dockerfiles/resource-staging-server/Dockerfile"
@@ -61,9 +72,25 @@ private[spark] class SparkDockerImageBuilder(private val dockerEnv: Map[String,
def buildSparkDockerImages(): Unit = {
Eventually.eventually(TIMEOUT, INTERVAL) { dockerClient.ping() }
+ // Building Python distribution environment
+ val pythonExec = sys.env.get("PYSPARK_DRIVER_PYTHON")
+ .orElse(sys.env.get("PYSPARK_PYTHON"))
+ .getOrElse("/usr/bin/python")
+ val builder = new ProcessBuilder(
+ Seq(pythonExec, "setup.py", "sdist").asJava)
+ builder.directory(new File(DOCKER_BUILD_PATH.toFile, "python"))
+ builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
+ val process = builder.start()
+ new RedirectThread(process.getInputStream, System.out, "redirect output").start()
+ val exitCode = process.waitFor()
+ if (exitCode != 0) {
+ logInfo(s"exitCode: $exitCode")
+ }
buildImage("spark-base", BASE_DOCKER_FILE)
buildImage("spark-driver", DRIVER_DOCKER_FILE)
+ buildImage("spark-driver-py", DRIVERPY_DOCKER_FILE)
buildImage("spark-executor", EXECUTOR_DOCKER_FILE)
+ buildImage("spark-executor-py", EXECUTORPY_DOCKER_FILE)
buildImage("spark-shuffle", SHUFFLE_SERVICE_DOCKER_FILE)
buildImage("spark-resource-staging-server", STAGING_SERVER_DOCKER_FILE)
buildImage("spark-init", INIT_CONTAINER_DOCKER_FILE)