Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Python Bindings for launching PySpark Jobs from the JVM #364

Merged
merged 29 commits into from
Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d3cf58f
Adding PySpark Submit functionality. Launching Python from JVM
ifilonenko Jun 16, 2017
bafc13c
Addressing scala idioms related to PR351
ifilonenko Jun 17, 2017
59d9f0a
Removing extends Logging which was necessary for LogInfo
ifilonenko Jun 17, 2017
4daf634
Refactored code to leverage the ContainerLocalizedFileResolver
ifilonenko Jun 20, 2017
51105ca
Modified Unit tests so that they would pass
ifilonenko Jun 20, 2017
bd30f40
Modified Unit Test input to pass Unit Tests
ifilonenko Jun 20, 2017
720776e
Setup working environent for integration tests for PySpark
ifilonenko Jun 21, 2017
4b5f470
Comment out Python thread logic until Jenkins has python in Python
ifilonenko Jun 21, 2017
1361a26
Modifying PythonExec to pass on Jenkins
ifilonenko Jun 21, 2017
0abc3b1
Modifying python exec
ifilonenko Jun 21, 2017
0869b07
Added unit tests to ClientV2 and refactored to include pyspark submis…
ifilonenko Jun 23, 2017
38d48ce
Merge branch 'branch-2.1-kubernetes' of https://github.com/apache-spa…
ifilonenko Jun 23, 2017
9bf7b9d
Modified unit test check
ifilonenko Jun 23, 2017
4561194
Scalastyle
ifilonenko Jun 23, 2017
2cf96cc
Merged with PR 348 and added further tests and minor documentation
ifilonenko Jun 23, 2017
eb1079a
PR 348 file conflicts
ifilonenko Jun 23, 2017
4a6b779
Refactored unit tests and styles
ifilonenko Jun 28, 2017
363919a
further scala stylzing and logic
ifilonenko Jun 28, 2017
9c7adb1
Modified unit tests to be more specific towards Class in question
ifilonenko Jun 28, 2017
0388aa4
Removed space delimiting for methods
ifilonenko Jun 28, 2017
6acab03
Merge branch 'branch-2.1-kubernetes' of https://github.com/apache-spa…
ifilonenko Jun 29, 2017
5499f6d
Submission client redesign to use a step-based builder pattern.
mccheah Jun 29, 2017
e103225
Don't add the init-container step if all URIs are local.
mccheah Jun 30, 2017
4533df2
Python arguments patch + tests + docs
ifilonenko Jun 30, 2017
cc289f1
Revert "Python arguments patch + tests + docs"
mccheah Jun 30, 2017
c267286
Revert "Don't add the init-container step if all URIs are local."
mccheah Jun 30, 2017
8045c94
Revert "Submission client redesign to use a step-based builder pattern."
mccheah Jun 30, 2017
41b6b8c
style changes
ifilonenko Jun 30, 2017
923f956
space for styling
ifilonenko Jun 30, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We've been asked by an Apache Spark Committer to work outside of the Apache infr

This is a collaborative effort by several folks from different companies who are interested in seeing this feature be successful. Companies active in this project include (alphabetically):

- Bloomberg
- Google
- Haiwen
- Hyperpilot
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,8 @@ object SparkSubmit {
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLUSTER) if args.isPython || args.isR =>
printErrorAndExit("Kubernetes does not currently support python or R applications.")
case (KUBERNETES, CLUSTER) if args.isR =>
printErrorAndExit("Kubernetes does not currently support R applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
Expand Down Expand Up @@ -620,8 +620,14 @@ object SparkSubmit {

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
childArgs += args.primaryResource
childArgs += args.mainClass
if (args.isPython) {
childArgs += args.primaryResource
childArgs += "org.apache.spark.deploy.PythonRunner"
childArgs += args.pyFiles
} else {
childArgs += args.primaryResource
childArgs += args.mainClass
}
childArgs ++= args.childArgs
}

Expand Down
26 changes: 26 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,32 @@ The above mechanism using `kubectl proxy` can be used when we have authenticatio
kubernetes-client library does not support. Authentication using X509 Client Certs and OAuth tokens
is currently supported.

### Running PySpark

Running PySpark on Kubernetes leverages the same spark-submit logic when launching on Yarn and Mesos.
Python files can be distributed by including, in the conf, `--py-files`

Below is an example submission:


```
bin/spark-submit \
--deploy-mode cluster \
--master k8s://http://127.0.0.1:8001 \
--kubernetes-namespace default \
--conf spark.executor.memory=500m \
--conf spark.driver.memory=1G \
--conf spark.driver.cores=1 \
--conf spark.executor.cores=1 \
--conf spark.executor.instances=1 \
--conf spark.app.name=spark-pi \
--conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
--conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
--conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
--py-files local:///opt/spark/examples/src/main/python/sort.py \
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is unrelated, but so happy to have local support :)

local:///opt/spark/examples/src/main/python/pi.py 100
```

## Dynamic Executor Scaling

Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ package object constants {
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"

// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ private[spark] class Client(
appName: String,
kubernetesResourceNamePrefix: String,
kubernetesAppId: String,
mainAppResource: String,
pythonResource: Option[PythonSubmissionResourcesImpl],
mainClass: String,
sparkConf: SparkConf,
appArgs: Array[String],
sparkJars: Seq[String],
sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClient: KubernetesClient,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
Expand Down Expand Up @@ -82,9 +82,7 @@ private[spark] class Client(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)

def run(): Unit = {
validateNoDuplicateFileNames(sparkJars)
validateNoDuplicateFileNames(sparkFiles)

val arguments = (pythonResource map {p => p.arguments}).getOrElse(appArgs)
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
sparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX,
Expand Down Expand Up @@ -136,7 +134,7 @@ private[spark] class Client(
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.mkString(" "))
.withValue(arguments.mkString(" "))
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
Expand Down Expand Up @@ -182,10 +180,13 @@ private[spark] class Client(
.map(_.build())

val containerLocalizedFilesResolver = initContainerComponentsProvider
.provideContainerLocalizedFilesResolver()
.provideContainerLocalizedFilesResolver(mainAppResource)
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()

val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles()
val resolvedPrimaryPySparkResource = pythonResource.map {
p => p.primaryPySparkResource(containerLocalizedFilesResolver)
}.getOrElse("")
val initContainerBundler = initContainerComponentsProvider
.provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()),
resolvedSparkJars ++ resolvedSparkFiles)
Expand Down Expand Up @@ -221,7 +222,7 @@ private[spark] class Client(
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
case (confKey, confValue) => s"-D$confKey=$confValue"
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
Expand All @@ -233,7 +234,15 @@ private[spark] class Client(
.endEnv()
.endContainer()
.endSpec()
.build()
val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter()
val resolvedDriverPod = pythonResource.map {
p => p.driverPodWithPySparkEnvs(
driverPodFileMounter,
resolvedPrimaryPySparkResource,
resolvedPySparkFiles.mkString(","),
driverContainer.getName,
resolvedDriverPodBuilder
)}.getOrElse(resolvedDriverPodBuilder.build())
Utils.tryWithResource(
kubernetesClient
.pods()
Expand Down Expand Up @@ -271,17 +280,6 @@ private[spark] class Client(
}
}
}

private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
val fileNamesToUris = allFiles.map { file =>
(new File(Utils.resolveURI(file).getPath).getName, file)
}
fileNamesToUris.groupBy(_._1).foreach {
case (fileName, urisWithFileName) =>
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
}
}
}

private[spark] object Client {
Expand All @@ -292,22 +290,34 @@ private[spark] object Client {
val appArgs = args.drop(2)
run(sparkConf, mainAppResource, mainClass, appArgs)
}

def run(
sparkConf: SparkConf,
mainAppResource: String,
mainClass: String,
appArgs: Array[String]): Unit = {
val isPython = mainAppResource.endsWith(".py")
val pythonResource: Option[PythonSubmissionResourcesImpl] =
if (isPython) {
Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs))
} else None
// Since you might need jars for SQL UDFs in PySpark
def sparkJarFilter(): Seq[String] =
pythonResource.map {p => p.sparkJars}.getOrElse(
Option(mainAppResource)
.filterNot(_ == SparkLauncher.NO_RESOURCE)
.toSeq)
val sparkJars = sparkConf.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
Option(mainAppResource)
.filterNot(_ == SparkLauncher.NO_RESOURCE)
.toSeq
.getOrElse(Array.empty[String]) ++ sparkJarFilter()
val launchTime = System.currentTimeMillis
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val pySparkFilesOption = pythonResource.map {p => p.pySparkFiles}
validateNoDuplicateFileNames(sparkJars)
validateNoDuplicateFileNames(sparkFiles)
pySparkFilesOption.foreach {b => validateNoDuplicateFileNames(b)}
val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String])
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The resource name prefix is derived from the application name, making it easy to connect the
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
Expand All @@ -326,6 +336,7 @@ private[spark] object Client {
namespace,
sparkJars,
sparkFiles,
pySparkFiles,
sslOptionsProvider.getSslOptions)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Expand All @@ -346,16 +357,26 @@ private[spark] object Client {
appName,
kubernetesResourceNamePrefix,
kubernetesAppId,
mainAppResource,
pythonResource,
mainClass,
sparkConf,
appArgs,
sparkJars,
sparkFiles,
waitForAppCompletion,
kubernetesClient,
initContainerComponentsProvider,
kubernetesCredentialsMounterProvider,
loggingPodStatusWatcher).run()
}
}
private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
val fileNamesToUris = allFiles.map { file =>
(new File(Utils.resolveURI(file).getPath).getName, file)
}
fileNamesToUris.groupBy(_._1).foreach {
case (fileName, urisWithFileName) =>
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ private[spark] trait ContainerLocalizedFilesResolver {
def resolveSubmittedAndRemoteSparkJars(): Seq[String]
def resolveSubmittedSparkJars(): Seq[String]
def resolveSubmittedSparkFiles(): Seq[String]
def resolveSubmittedPySparkFiles(): Seq[String]
def resolvePrimaryResourceFile(): String
}

private[spark] class ContainerLocalizedFilesResolverImpl(
sparkJars: Seq[String],
sparkFiles: Seq[String],
pySparkFiles: Seq[String],
primaryPyFile: String,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an option?

jarsDownloadPath: String,
filesDownloadPath: String) extends ContainerLocalizedFilesResolver {


override def resolveSubmittedAndRemoteSparkJars(): Seq[String] = {
sparkJars.map { jar =>
val jarUri = Utils.resolveURI(jar)
Expand All @@ -53,16 +58,30 @@ private[spark] class ContainerLocalizedFilesResolverImpl(
resolveSubmittedFiles(sparkFiles, filesDownloadPath)
}

private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
files.map { file =>
val fileUri = Utils.resolveURI(file)
Option(fileUri.getScheme).getOrElse("file") match {
case "file" =>
val fileName = new File(fileUri.getPath).getName
s"$downloadPath/$fileName"
case _ =>
file
}
override def resolveSubmittedPySparkFiles(): Seq[String] = {
def filterMainResource(x: String) = x match {
case `primaryPyFile` => None
case _ => Some(resolveFile(x, filesDownloadPath))
}
pySparkFiles.flatMap(x => filterMainResource(x))
}

override def resolvePrimaryResourceFile(): String = {
Option(primaryPyFile).map(p => resolveFile(p, filesDownloadPath)).getOrElse("")
}

private def resolveFile(file: String, downloadPath: String) = {
val fileUri = Utils.resolveURI(file)
Option(fileUri.getScheme).getOrElse("file") match {
case "file" =>
val fileName = new File(fileUri.getPath).getName
s"$downloadPath/$fileName"
case _ =>
file
}
}

private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
files.map { file => resolveFile(file, downloadPath) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import org.apache.spark.util.Utils
*/
private[spark] trait DriverInitContainerComponentsProvider {

def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver
def provideContainerLocalizedFilesResolver(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra space after )

mainAppResource: String): ContainerLocalizedFilesResolver
def provideInitContainerSubmittedDependencyUploader(
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader]
def provideSubmittedDependenciesSecretBuilder(
maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets])
: Option[SubmittedDependencySecretBuilder]
def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap
def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter
def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle]
}
Expand All @@ -49,6 +51,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
namespace: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
pySparkFiles: Seq[String],
resourceStagingServerExternalSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

Expand Down Expand Up @@ -104,6 +107,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
private val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles)

private def provideInitContainerConfigMap(
maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = {
Expand All @@ -130,17 +134,18 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
}
new SparkInitContainerConfigMapBuilderImpl(
sparkJars,
sparkFiles,
sparkFiles ++ pySparkSubmitted,
jarsDownloadPath,
filesDownloadPath,
configMapName,
configMapKey,
submittedDependencyConfigPlugin).build()
}

override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
override def provideContainerLocalizedFilesResolver(mainAppResource: String)
: ContainerLocalizedFilesResolver = {
new ContainerLocalizedFilesResolverImpl(
sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath)
sparkJars, sparkFiles, pySparkFiles, mainAppResource, jarsDownloadPath, filesDownloadPath)
}

private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
Expand All @@ -159,7 +164,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
sparkFiles ++ pySparkSubmitted,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
}
Expand Down Expand Up @@ -201,13 +206,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
configMapKey,
resourceStagingServerSecretPlugin)
}

override def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter = {
new DriverPodKubernetesFileMounterImpl()
}
override def provideInitContainerBundle(
maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle] = {
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
// Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes'
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is pre-existing but why?

// is empty or only has `local://` URIs
if ((KubernetesFileUtils.getNonContainerLocalFiles(uris) ++ pySparkSubmitted).nonEmpty) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So right now pySparkSubmitted is defined as val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles) and that seems like not what is desired based on the previous comment and also the logic used for the non-container local files before hand.

Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds),
provideInitContainerBootstrap(),
provideExecutorInitContainerConfiguration()))
Expand Down
Loading