Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Python Bindings for launching PySpark Jobs from the JVM #364

Merged
merged 29 commits into from
Jul 3, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d3cf58f
Adding PySpark Submit functionality. Launching Python from JVM
ifilonenko Jun 16, 2017
bafc13c
Addressing scala idioms related to PR351
ifilonenko Jun 17, 2017
59d9f0a
Removing extends Logging which was necessary for LogInfo
ifilonenko Jun 17, 2017
4daf634
Refactored code to leverage the ContainerLocalizedFileResolver
ifilonenko Jun 20, 2017
51105ca
Modified Unit tests so that they would pass
ifilonenko Jun 20, 2017
bd30f40
Modified Unit Test input to pass Unit Tests
ifilonenko Jun 20, 2017
720776e
Setup working environent for integration tests for PySpark
ifilonenko Jun 21, 2017
4b5f470
Comment out Python thread logic until Jenkins has python in Python
ifilonenko Jun 21, 2017
1361a26
Modifying PythonExec to pass on Jenkins
ifilonenko Jun 21, 2017
0abc3b1
Modifying python exec
ifilonenko Jun 21, 2017
0869b07
Added unit tests to ClientV2 and refactored to include pyspark submis…
ifilonenko Jun 23, 2017
38d48ce
Merge branch 'branch-2.1-kubernetes' of https://github.com/apache-spa…
ifilonenko Jun 23, 2017
9bf7b9d
Modified unit test check
ifilonenko Jun 23, 2017
4561194
Scalastyle
ifilonenko Jun 23, 2017
2cf96cc
Merged with PR 348 and added further tests and minor documentation
ifilonenko Jun 23, 2017
eb1079a
PR 348 file conflicts
ifilonenko Jun 23, 2017
4a6b779
Refactored unit tests and styles
ifilonenko Jun 28, 2017
363919a
further scala stylzing and logic
ifilonenko Jun 28, 2017
9c7adb1
Modified unit tests to be more specific towards Class in question
ifilonenko Jun 28, 2017
0388aa4
Removed space delimiting for methods
ifilonenko Jun 28, 2017
6acab03
Merge branch 'branch-2.1-kubernetes' of https://github.com/apache-spa…
ifilonenko Jun 29, 2017
5499f6d
Submission client redesign to use a step-based builder pattern.
mccheah Jun 29, 2017
e103225
Don't add the init-container step if all URIs are local.
mccheah Jun 30, 2017
4533df2
Python arguments patch + tests + docs
ifilonenko Jun 30, 2017
cc289f1
Revert "Python arguments patch + tests + docs"
mccheah Jun 30, 2017
c267286
Revert "Don't add the init-container step if all URIs are local."
mccheah Jun 30, 2017
8045c94
Revert "Submission client redesign to use a step-based builder pattern."
mccheah Jun 30, 2017
41b6b8c
style changes
ifilonenko Jun 30, 2017
923f956
space for styling
ifilonenko Jun 30, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -621,14 +621,22 @@ object SparkSubmit {
if (isKubernetesCluster) {
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client"
if (args.isPython) {
childArgs += "--py-file"
childArgs += args.primaryResource
childArgs += "--main-class"
childArgs += "org.apache.spark.deploy.PythonRunner"
childArgs += "--other-py-files"
childArgs += args.pyFiles
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check if it's empty?

} else {
childArgs += "--primary-java-resource"
childArgs += args.primaryResource
childArgs += "--main-class"
childArgs += args.mainClass
}
childArgs ++= args.childArgs
args.childArgs.foreach { arg =>
childArgs += "--arg"
childArgs += arg
}
}

// Load any properties specified through --conf and the default properties file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret}
import io.fabric8.kubernetes.api.model.{Container, ContainerBuilder, Pod, PodBuilder, Secret}

import org.apache.spark.deploy.kubernetes.constants._

Expand All @@ -27,13 +27,13 @@ private[spark] trait InitContainerResourceStagingServerSecretPlugin {
* from a resource staging server.
*/
def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder
initContainer: Container): Container

/**
* Configure the pod to attach a Secret volume which hosts secret files allowing the
* init-container to retrieve dependencies from the resource staging server.
*/
def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder
def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod
}

private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
Expand All @@ -42,21 +42,25 @@ private[spark] class InitContainerResourceStagingServerSecretPluginImpl(
extends InitContainerResourceStagingServerSecretPlugin {

override def mountResourceStagingServerSecretIntoInitContainer(
initContainer: ContainerBuilder): ContainerBuilder = {
initContainer.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
initContainer: Container): Container = {
new ContainerBuilder(initContainer)
.addNewVolumeMount()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withMountPath(initContainerSecretMountPath)
.endVolumeMount()
.build()
}

override def addResourceStagingServerSecretVolumeToPod(basePod: PodBuilder): PodBuilder = {
basePod.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
override def addResourceStagingServerSecretVolumeToPod(basePod: Pod): Pod = {
new PodBuilder(basePod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_SECRET_VOLUME_NAME)
.withNewSecret()
.withSecretName(initContainerSecretName)
.endSecret()
.endVolume()
.endSpec()
.build()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.kubernetes

import io.fabric8.kubernetes.api.model.{Container, Pod}

private[spark] case class PodWithDetachedInitContainer(
pod: Pod,
initContainer: Container,
mainContainer: Container)
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ package org.apache.spark.deploy.kubernetes
import io.fabric8.kubernetes.api.model.{ContainerBuilder, EmptyDirVolumeSource, PodBuilder, VolumeMount, VolumeMountBuilder}

import org.apache.spark.deploy.kubernetes.constants._
import org.apache.spark.deploy.kubernetes.submit.{ContainerNameEqualityPredicate, InitContainerUtil}

/**
* This is separated out from the init-container steps API because this component can be reused to
* set up the init-container for executors as well.
*/
private[spark] trait SparkPodInitContainerBootstrap {
/**
* Bootstraps an init-container that downloads dependencies to be used by a main container.
* Note that this primarily assumes that the init-container's configuration is being provided
* by a ConfigMap that was installed by some other component; that is, the implementation
* here makes no assumptions about how the init-container is specifically configured. For
* example, this class is unaware if the init-container is fetching remote dependencies or if
* it is fetching dependencies from a resource staging server.
* it is fetching dependencies from a resource staging server. Additionally, the container itself
* is not actually attached to the pod, but the init container is returned so it can be attached
* by InitContainerUtil after the caller has decided to make any changes to it.
*/
def bootstrapInitContainerAndVolumes(
mainContainerName: String, originalPodSpec: PodBuilder): PodBuilder
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
: PodWithDetachedInitContainer
}

private[spark] class SparkPodInitContainerBootstrapImpl(
Expand All @@ -41,13 +47,12 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
filesDownloadPath: String,
downloadTimeoutMinutes: Long,
initContainerConfigMapName: String,
initContainerConfigMapKey: String,
resourceStagingServerSecretPlugin: Option[InitContainerResourceStagingServerSecretPlugin])
initContainerConfigMapKey: String)
extends SparkPodInitContainerBootstrap {

override def bootstrapInitContainerAndVolumes(
mainContainerName: String,
originalPodSpec: PodBuilder): PodBuilder = {
originalPodWithUnattachedInitContainer: PodWithDetachedInitContainer)
: PodWithDetachedInitContainer = {
val sharedVolumeMounts = Seq[VolumeMount](
new VolumeMountBuilder()
.withName(INIT_CONTAINER_DOWNLOAD_JARS_VOLUME_NAME)
Expand All @@ -58,7 +63,7 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withMountPath(filesDownloadPath)
.build())

val initContainer = new ContainerBuilder()
val initContainer = new ContainerBuilder(originalPodWithUnattachedInitContainer.initContainer)
.withName(s"spark-init")
.withImage(initContainerImage)
.withImagePullPolicy(dockerImagePullPolicy)
Expand All @@ -68,11 +73,8 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.endVolumeMount()
.addToVolumeMounts(sharedVolumeMounts: _*)
.addToArgs(INIT_CONTAINER_PROPERTIES_FILE_PATH)
val resolvedInitContainer = resourceStagingServerSecretPlugin.map { plugin =>
plugin.mountResourceStagingServerSecretIntoInitContainer(initContainer)
}.getOrElse(initContainer).build()
val podWithBasicVolumes = InitContainerUtil.appendInitContainer(
originalPodSpec, resolvedInitContainer)
.build()
val podWithBasicVolumes = new PodBuilder(originalPodWithUnattachedInitContainer.pod)
.editSpec()
.addNewVolume()
.withName(INIT_CONTAINER_PROPERTIES_FILE_VOLUME)
Expand All @@ -92,17 +94,20 @@ private[spark] class SparkPodInitContainerBootstrapImpl(
.withName(INIT_CONTAINER_DOWNLOAD_FILES_VOLUME_NAME)
.withEmptyDir(new EmptyDirVolumeSource())
.endVolume()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.endContainer()
.endSpec()
resourceStagingServerSecretPlugin.map { plugin =>
plugin.addResourceStagingServerSecretVolumeToPod(podWithBasicVolumes)
}.getOrElse(podWithBasicVolumes)
.build()
val mainContainerWithMountedFiles = new ContainerBuilder(
originalPodWithUnattachedInitContainer.mainContainer)
.addToVolumeMounts(sharedVolumeMounts: _*)
.addNewEnv()
.withName(ENV_MOUNTED_FILES_DIR)
.withValue(filesDownloadPath)
.endEnv()
.build()
PodWithDetachedInitContainer(
podWithBasicVolumes,
initContainer,
mainContainerWithMountedFiles)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,14 @@ package object config extends Logging {
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_REMOTE_PYSPARK_FILES =
ConfigBuilder("spark.kubernetes.initcontainer.remotePyFiles")
.doc("Comma-separated list of Python file URIs to download in the init-container. This is" +
" calculated given the list of python files sent to spark-submit.")
.internal()
.stringConf
.createOptional

private[spark] val INIT_CONTAINER_DOCKER_IMAGE =
ConfigBuilder("spark.kubernetes.initcontainer.docker.image")
.doc("Image for the driver and executor's init-container that downloads dependencies.")
Expand Down
Loading