Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Commit

Permalink
Python Bindings for launching PySpark Jobs from the JVM (#364)
Browse files Browse the repository at this point in the history
* Adding PySpark Submit functionality. Launching Python from JVM

* Addressing scala idioms related to PR351

* Removing extends Logging which was necessary for LogInfo

* Refactored code to leverage the ContainerLocalizedFileResolver

* Modified Unit tests so that they would pass

* Modified Unit Test input to pass Unit Tests

* Setup working environent for integration tests for PySpark

* Comment out Python thread logic until Jenkins has python in Python

* Modifying PythonExec to pass on Jenkins

* Modifying python exec

* Added unit tests to ClientV2 and refactored to include pyspark submission resources

* Modified unit test check

* Scalastyle

* PR 348 file conflicts

* Refactored unit tests and styles

* further scala stylzing and logic

* Modified unit tests to be more specific towards Class in question

* Removed space delimiting for methods

* Submission client redesign to use a step-based builder pattern.

This change overhauls the underlying architecture of the submission
client, but it is intended to entirely preserve existing behavior of
Spark applications. Therefore users will find this to be an invisible
change.

The philosophy behind this design is to reconsider the breakdown of the
submission process. It operates off the abstraction of "submission
steps", which are transformation functions that take the previous state
of the driver and return the new state of the driver. The driver's state
includes its Spark configurations and the Kubernetes resources that will
be used to deploy it.

Such a refactor moves away from a features-first API design, which
considers different containers to serve a set of features. The previous
design, for example, had a container files resolver API object that
returned different resolutions of the dependencies added by the user.
However, it was up to the main Client to know how to intelligently
invoke all of those APIs. Therefore the API surface area of the file
resolver became untenably large and it was not intuitive of how it was
to be used or extended.

This design changes the encapsulation layout; every module is now
responsible for changing the driver specification directly. An
orchestrator builds the correct chain of steps and hands it to the
client, which then calls it verbatim. The main client then makes any
final modifications that put the different pieces of the driver
together, particularly to attach the driver container itself to the pod
and to apply the Spark configuration as command-line arguments.

* Don't add the init-container step if all URIs are local.

* Python arguments patch + tests + docs

* Revert "Python arguments patch + tests + docs"

This reverts commit 4533df2.

* Revert "Don't add the init-container step if all URIs are local."

This reverts commit e103225.

* Revert "Submission client redesign to use a step-based builder pattern."

This reverts commit 5499f6d.

* style changes

* space for styling
  • Loading branch information
ifilonenko authored and foxish committed Jul 24, 2017
1 parent 9dc5eed commit 442490a
Show file tree
Hide file tree
Showing 21 changed files with 831 additions and 73 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ We've been asked by an Apache Spark Committer to work outside of the Apache infr

This is a collaborative effort by several folks from different companies who are interested in seeing this feature be successful. Companies active in this project include (alphabetically):

- Bloomberg
- Google
- Haiwen
- Hyperpilot
Expand Down
14 changes: 10 additions & 4 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ object SparkSubmit extends CommandLineUtils {
(clusterManager, deployMode) match {
case (KUBERNETES, CLIENT) =>
printErrorAndExit("Client mode is currently not supported for Kubernetes.")
case (KUBERNETES, CLUSTER) if args.isPython || args.isR =>
printErrorAndExit("Kubernetes does not currently support python or R applications.")
case (KUBERNETES, CLUSTER) if args.isR =>
printErrorAndExit("Kubernetes does not currently support R applications.")
case (STANDALONE, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python " +
"applications on standalone clusters.")
Expand Down Expand Up @@ -636,8 +636,14 @@ object SparkSubmit extends CommandLineUtils {

if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
childArgs += args.primaryResource
childArgs += args.mainClass
if (args.isPython) {
childArgs += args.primaryResource
childArgs += "org.apache.spark.deploy.PythonRunner"
childArgs += args.pyFiles
} else {
childArgs += args.primaryResource
childArgs += args.mainClass
}
childArgs ++= args.childArgs
}

Expand Down
26 changes: 26 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,32 @@ The above mechanism using `kubectl proxy` can be used when we have authenticatio
kubernetes-client library does not support. Authentication using X509 Client Certs and OAuth tokens
is currently supported.

### Running PySpark

Running PySpark on Kubernetes leverages the same spark-submit logic when launching on Yarn and Mesos.
Python files can be distributed by including, in the conf, `--py-files`

Below is an example submission:


```
bin/spark-submit \
--deploy-mode cluster \
--master k8s://http://127.0.0.1:8001 \
--kubernetes-namespace default \
--conf spark.executor.memory=500m \
--conf spark.driver.memory=1G \
--conf spark.driver.cores=1 \
--conf spark.executor.cores=1 \
--conf spark.executor.instances=1 \
--conf spark.app.name=spark-pi \
--conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
--conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
--conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
--py-files local:///opt/spark/examples/src/main/python/sort.py \
local:///opt/spark/examples/src/main/python/pi.py 100
```

## Dynamic Executor Scaling

Spark on Kubernetes supports Dynamic Allocation with cluster mode. This mode requires running
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ package object constants {
private[spark] val ENV_DRIVER_ARGS = "SPARK_DRIVER_ARGS"
private[spark] val ENV_DRIVER_JAVA_OPTS = "SPARK_DRIVER_JAVA_OPTS"
private[spark] val ENV_MOUNTED_FILES_DIR = "SPARK_MOUNTED_FILES_DIR"
private[spark] val ENV_PYSPARK_FILES = "PYSPARK_FILES"
private[spark] val ENV_PYSPARK_PRIMARY = "PYSPARK_PRIMARY"

// Bootstrapping dependencies with the init-container
private[spark] val INIT_CONTAINER_ANNOTATION = "pod.beta.kubernetes.io/init-containers"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ private[spark] class Client(
appName: String,
kubernetesResourceNamePrefix: String,
kubernetesAppId: String,
mainAppResource: String,
pythonResource: Option[PythonSubmissionResourcesImpl],
mainClass: String,
sparkConf: SparkConf,
appArgs: Array[String],
sparkJars: Seq[String],
sparkFiles: Seq[String],
waitForAppCompletion: Boolean,
kubernetesClient: KubernetesClient,
initContainerComponentsProvider: DriverInitContainerComponentsProvider,
Expand Down Expand Up @@ -82,9 +82,7 @@ private[spark] class Client(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)

def run(): Unit = {
validateNoDuplicateFileNames(sparkJars)
validateNoDuplicateFileNames(sparkFiles)

val arguments = (pythonResource map {p => p.arguments}).getOrElse(appArgs)
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf(
sparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX,
Expand Down Expand Up @@ -136,7 +134,7 @@ private[spark] class Client(
.endEnv()
.addNewEnv()
.withName(ENV_DRIVER_ARGS)
.withValue(appArgs.mkString(" "))
.withValue(arguments.mkString(" "))
.endEnv()
.withNewResources()
.addToRequests("cpu", driverCpuQuantity)
Expand Down Expand Up @@ -182,10 +180,13 @@ private[spark] class Client(
.map(_.build())

val containerLocalizedFilesResolver = initContainerComponentsProvider
.provideContainerLocalizedFilesResolver()
.provideContainerLocalizedFilesResolver(mainAppResource)
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars()
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles()

val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles()
val resolvedPrimaryPySparkResource = pythonResource.map {
p => p.primaryPySparkResource(containerLocalizedFilesResolver)
}.getOrElse("")
val initContainerBundler = initContainerComponentsProvider
.provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()),
resolvedSparkJars ++ resolvedSparkFiles)
Expand Down Expand Up @@ -221,7 +222,7 @@ private[spark] class Client(
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map {
case (confKey, confValue) => s"-D$confKey=$confValue"
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("")
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec()
val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName))
.addNewEnv()
.withName(ENV_MOUNTED_CLASSPATH)
Expand All @@ -233,7 +234,15 @@ private[spark] class Client(
.endEnv()
.endContainer()
.endSpec()
.build()
val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter()
val resolvedDriverPod = pythonResource.map {
p => p.driverPodWithPySparkEnvs(
driverPodFileMounter,
resolvedPrimaryPySparkResource,
resolvedPySparkFiles.mkString(","),
driverContainer.getName,
resolvedDriverPodBuilder
)}.getOrElse(resolvedDriverPodBuilder.build())
Utils.tryWithResource(
kubernetesClient
.pods()
Expand Down Expand Up @@ -271,17 +280,6 @@ private[spark] class Client(
}
}
}

private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
val fileNamesToUris = allFiles.map { file =>
(new File(Utils.resolveURI(file).getPath).getName, file)
}
fileNamesToUris.groupBy(_._1).foreach {
case (fileName, urisWithFileName) =>
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
}
}
}

private[spark] object Client {
Expand All @@ -292,22 +290,34 @@ private[spark] object Client {
val appArgs = args.drop(2)
run(sparkConf, mainAppResource, mainClass, appArgs)
}

def run(
sparkConf: SparkConf,
mainAppResource: String,
mainClass: String,
appArgs: Array[String]): Unit = {
val isPython = mainAppResource.endsWith(".py")
val pythonResource: Option[PythonSubmissionResourcesImpl] =
if (isPython) {
Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs))
} else None
// Since you might need jars for SQL UDFs in PySpark
def sparkJarFilter(): Seq[String] =
pythonResource.map {p => p.sparkJars}.getOrElse(
Option(mainAppResource)
.filterNot(_ == SparkLauncher.NO_RESOURCE)
.toSeq)
val sparkJars = sparkConf.getOption("spark.jars")
.map(_.split(","))
.getOrElse(Array.empty[String]) ++
Option(mainAppResource)
.filterNot(_ == SparkLauncher.NO_RESOURCE)
.toSeq
.getOrElse(Array.empty[String]) ++ sparkJarFilter()
val launchTime = System.currentTimeMillis
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val pySparkFilesOption = pythonResource.map {p => p.pySparkFiles}
validateNoDuplicateFileNames(sparkJars)
validateNoDuplicateFileNames(sparkFiles)
pySparkFilesOption.foreach {b => validateNoDuplicateFileNames(b)}
val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String])
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
// The resource name prefix is derived from the application name, making it easy to connect the
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the
Expand All @@ -326,6 +336,7 @@ private[spark] object Client {
namespace,
sparkJars,
sparkFiles,
pySparkFiles,
sslOptionsProvider.getSslOptions)
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient(
master,
Expand All @@ -346,16 +357,26 @@ private[spark] object Client {
appName,
kubernetesResourceNamePrefix,
kubernetesAppId,
mainAppResource,
pythonResource,
mainClass,
sparkConf,
appArgs,
sparkJars,
sparkFiles,
waitForAppCompletion,
kubernetesClient,
initContainerComponentsProvider,
kubernetesCredentialsMounterProvider,
loggingPodStatusWatcher).run()
}
}
private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = {
val fileNamesToUris = allFiles.map { file =>
(new File(Utils.resolveURI(file).getPath).getName, file)
}
fileNamesToUris.groupBy(_._1).foreach {
case (fileName, urisWithFileName) =>
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" +
s" file name $fileName is shared by all of these URIs: $urisWithFileName")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,19 @@ private[spark] trait ContainerLocalizedFilesResolver {
def resolveSubmittedAndRemoteSparkJars(): Seq[String]
def resolveSubmittedSparkJars(): Seq[String]
def resolveSubmittedSparkFiles(): Seq[String]
def resolveSubmittedPySparkFiles(): Seq[String]
def resolvePrimaryResourceFile(): String
}

private[spark] class ContainerLocalizedFilesResolverImpl(
sparkJars: Seq[String],
sparkFiles: Seq[String],
pySparkFiles: Seq[String],
primaryPyFile: String,
jarsDownloadPath: String,
filesDownloadPath: String) extends ContainerLocalizedFilesResolver {


override def resolveSubmittedAndRemoteSparkJars(): Seq[String] = {
sparkJars.map { jar =>
val jarUri = Utils.resolveURI(jar)
Expand All @@ -53,16 +58,30 @@ private[spark] class ContainerLocalizedFilesResolverImpl(
resolveSubmittedFiles(sparkFiles, filesDownloadPath)
}

private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
files.map { file =>
val fileUri = Utils.resolveURI(file)
Option(fileUri.getScheme).getOrElse("file") match {
case "file" =>
val fileName = new File(fileUri.getPath).getName
s"$downloadPath/$fileName"
case _ =>
file
}
override def resolveSubmittedPySparkFiles(): Seq[String] = {
def filterMainResource(x: String) = x match {
case `primaryPyFile` => None
case _ => Some(resolveFile(x, filesDownloadPath))
}
pySparkFiles.flatMap(x => filterMainResource(x))
}

override def resolvePrimaryResourceFile(): String = {
Option(primaryPyFile).map(p => resolveFile(p, filesDownloadPath)).getOrElse("")
}

private def resolveFile(file: String, downloadPath: String) = {
val fileUri = Utils.resolveURI(file)
Option(fileUri.getScheme).getOrElse("file") match {
case "file" =>
val fileName = new File(fileUri.getPath).getName
s"$downloadPath/$fileName"
case _ =>
file
}
}

private def resolveSubmittedFiles(files: Seq[String], downloadPath: String): Seq[String] = {
files.map { file => resolveFile(file, downloadPath) }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,15 @@ import org.apache.spark.util.Utils
*/
private[spark] trait DriverInitContainerComponentsProvider {

def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver
def provideContainerLocalizedFilesResolver(
mainAppResource: String): ContainerLocalizedFilesResolver
def provideInitContainerSubmittedDependencyUploader(
driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader]
def provideSubmittedDependenciesSecretBuilder(
maybeSubmittedResourceSecrets: Option[SubmittedResourceSecrets])
: Option[SubmittedDependencySecretBuilder]
def provideInitContainerBootstrap(): SparkPodInitContainerBootstrap
def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter
def provideInitContainerBundle(maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle]
}
Expand All @@ -49,6 +51,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
namespace: String,
sparkJars: Seq[String],
sparkFiles: Seq[String],
pySparkFiles: Seq[String],
resourceStagingServerExternalSslOptions: SSLOptions)
extends DriverInitContainerComponentsProvider {

Expand Down Expand Up @@ -104,6 +107,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE)
private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY)
private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT)
private val pySparkSubmitted = KubernetesFileUtils.getOnlySubmitterLocalFiles(pySparkFiles)

private def provideInitContainerConfigMap(
maybeSubmittedResourceIds: Option[SubmittedResourceIds]): ConfigMap = {
Expand All @@ -130,17 +134,18 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
}
new SparkInitContainerConfigMapBuilderImpl(
sparkJars,
sparkFiles,
sparkFiles ++ pySparkSubmitted,
jarsDownloadPath,
filesDownloadPath,
configMapName,
configMapKey,
submittedDependencyConfigPlugin).build()
}

override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = {
override def provideContainerLocalizedFilesResolver(mainAppResource: String)
: ContainerLocalizedFilesResolver = {
new ContainerLocalizedFilesResolverImpl(
sparkJars, sparkFiles, jarsDownloadPath, filesDownloadPath)
sparkJars, sparkFiles, pySparkFiles, mainAppResource, jarsDownloadPath, filesDownloadPath)
}

private def provideExecutorInitContainerConfiguration(): ExecutorInitContainerConfiguration = {
Expand All @@ -159,7 +164,7 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
namespace,
stagingServerUri,
sparkJars,
sparkFiles,
sparkFiles ++ pySparkSubmitted,
resourceStagingServerExternalSslOptions,
RetrofitClientFactoryImpl)
}
Expand Down Expand Up @@ -201,13 +206,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl(
configMapKey,
resourceStagingServerSecretPlugin)
}

override def provideDriverPodFileMounter(): DriverPodKubernetesFileMounter = {
new DriverPodKubernetesFileMounterImpl()
}
override def provideInitContainerBundle(
maybeSubmittedResourceIds: Option[SubmittedResourceIds],
uris: Iterable[String]): Option[InitContainerBundle] = {
val containerLocalizedFilesResolver = provideContainerLocalizedFilesResolver()
// Bypass init-containers if `spark.jars` and `spark.files` is empty or only has `local://` URIs
if (KubernetesFileUtils.getNonContainerLocalFiles(uris).nonEmpty) {
// Bypass init-containers if `spark.jars` and `spark.files` and '--py-rilfes'
// is empty or only has `local://` URIs
if ((KubernetesFileUtils.getNonContainerLocalFiles(uris) ++ pySparkSubmitted).nonEmpty) {
Some(InitContainerBundle(provideInitContainerConfigMap(maybeSubmittedResourceIds),
provideInitContainerBootstrap(),
provideExecutorInitContainerConfiguration()))
Expand Down
Loading

0 comments on commit 442490a

Please sign in to comment.