-
Notifications
You must be signed in to change notification settings - Fork 118
Python Bindings for launching PySpark Jobs from the JVM (v1) #351
Changes from 1 commit
d3cf58f
bafc13c
59d9f0a
4daf634
51105ca
bd30f40
720776e
4b5f470
1361a26
0abc3b1
0869b07
38d48ce
9bf7b9d
4561194
2cf96cc
eb1079a
4a6b779
363919a
9c7adb1
0388aa4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -335,8 +335,8 @@ object SparkSubmit { | |
(clusterManager, deployMode) match { | ||
case (KUBERNETES, CLIENT) => | ||
printErrorAndExit("Client mode is currently not supported for Kubernetes.") | ||
case (KUBERNETES, CLUSTER) if args.isPython || args.isR => | ||
printErrorAndExit("Kubernetes does not currently support python or R applications.") | ||
case (KUBERNETES, CLUSTER) if args.isR => | ||
printErrorAndExit("Kubernetes does not currently support R applications.") | ||
case (STANDALONE, CLUSTER) if args.isPython => | ||
printErrorAndExit("Cluster deploy mode is currently not supported for python " + | ||
"applications on standalone clusters.") | ||
|
@@ -620,8 +620,15 @@ object SparkSubmit { | |
|
||
if (isKubernetesCluster) { | ||
childMainClass = "org.apache.spark.deploy.kubernetes.submit.Client" | ||
childArgs += args.primaryResource | ||
childArgs += args.mainClass | ||
if (args.isPython) { | ||
childArgs += args.primaryResource | ||
childArgs += "org.apache.spark.deploy.PythonRunner" | ||
childArgs += args.pyFiles | ||
} | ||
else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think scala-style expected to be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted |
||
childArgs += args.primaryResource | ||
childArgs += args.mainClass | ||
} | ||
childArgs ++= args.childArgs | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,11 +47,14 @@ private[spark] class Client( | |
appName: String, | ||
kubernetesResourceNamePrefix: String, | ||
kubernetesAppId: String, | ||
mainAppResource: String, | ||
isPython: Boolean, | ||
mainClass: String, | ||
sparkConf: SparkConf, | ||
appArgs: Array[String], | ||
sparkJars: Seq[String], | ||
sparkFiles: Seq[String], | ||
pySparkFiles: List[String], | ||
waitForAppCompletion: Boolean, | ||
kubernetesClient: KubernetesClient, | ||
initContainerComponentsProvider: DriverInitContainerComponentsProvider, | ||
|
@@ -83,7 +86,14 @@ private[spark] class Client( | |
def run(): Unit = { | ||
validateNoDuplicateFileNames(sparkJars) | ||
validateNoDuplicateFileNames(sparkFiles) | ||
|
||
if (isPython) {validateNoDuplicateFileNames(pySparkFiles)} | ||
val arguments = if (isPython) pySparkFiles match { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wonder if this could be factored to scale out cleaner. For example, if we add R next, is that going to be a new 3rd layer of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see --r-files as a submission type so it is something to look into. |
||
case Nil => appArgs | ||
case a::b => a match { | ||
case _ if a==mainAppResource && b==Nil => appArgs | ||
case _ => appArgs.drop(1) | ||
} | ||
} else appArgs | ||
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( | ||
sparkConf, | ||
KUBERNETES_DRIVER_LABEL_PREFIX, | ||
|
@@ -135,7 +145,7 @@ private[spark] class Client( | |
.endEnv() | ||
.addNewEnv() | ||
.withName(ENV_DRIVER_ARGS) | ||
.withValue(appArgs.mkString(" ")) | ||
.withValue(arguments.mkString(" ")) | ||
.endEnv() | ||
.withNewResources() | ||
.addToRequests("cpu", driverCpuQuantity) | ||
|
@@ -204,7 +214,7 @@ private[spark] class Client( | |
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { | ||
case (confKey, confValue) => s"-D$confKey=$confValue" | ||
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") | ||
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec() | ||
val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec() | ||
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) | ||
.addNewEnv() | ||
.withName(ENV_MOUNTED_CLASSPATH) | ||
|
@@ -216,7 +226,15 @@ private[spark] class Client( | |
.endEnv() | ||
.endContainer() | ||
.endSpec() | ||
.build() | ||
val resolvedDriverPod = if (!isPython) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we try to boil all of the logic guarded by the |
||
resolvedDriverPodBuilder.build() | ||
} else { | ||
initContainerComponentsProvider | ||
.provideDriverPodFileMounter() | ||
.addPySparkFiles( | ||
mainAppResource, pySparkFiles, driverContainer.getName, resolvedDriverPodBuilder) | ||
.build() | ||
} | ||
Utils.tryWithResource( | ||
kubernetesClient | ||
.pods() | ||
|
@@ -266,30 +284,36 @@ private[spark] class Client( | |
} | ||
} | ||
|
||
private[spark] object Client { | ||
private[spark] object Client{ | ||
def main(args: Array[String]): Unit = { | ||
val sparkConf = new SparkConf(true) | ||
val mainAppResource = args(0) | ||
val mainClass = args(1) | ||
val appArgs = args.drop(2) | ||
run(sparkConf, mainAppResource, mainClass, appArgs) | ||
} | ||
|
||
def run( | ||
sparkConf: SparkConf, | ||
mainAppResource: String, | ||
mainClass: String, | ||
appArgs: Array[String]): Unit = { | ||
val sparkJars = sparkConf.getOption("spark.jars") | ||
val isPython = mainAppResource.endsWith(".py") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alluding to a remark from earlier - we might want to treat these arguments differently. For example we could take a command line argument that is a "language mode" and expect SparkSubmit to give us the right language mode and handle accordingly - e.g. Scala, Python, R. We have control over the arguments that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True. but R will also have MainResource be the Python file, but there are no --r-files. So arguments logic is only for Python. I think it is simple enough that refactoring the arguments, might not be necessary. Something to consider, but I agree with what you are saying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm more wary of the fact that we're matching against There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I match on NIl is because the appArgs being passed in are: |
||
val sparkJars = if (isPython) Array.empty[String] else { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would it be the case that people upload spark.jars when running a PySpark job? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It could happen for SQL UDFs, perhaps? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think staging jars is a possible use case, although I have never done it. Seems OK to allow jar list to be set non-empty for now |
||
sparkConf.getOption("spark.jars") | ||
.map(_.split(",")) | ||
.getOrElse(Array.empty[String]) ++ | ||
Option(mainAppResource) | ||
.filterNot(_ == SparkLauncher.NO_RESOURCE) | ||
.toSeq | ||
.toSeq } | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val launchTime = System.currentTimeMillis | ||
val sparkFiles = sparkConf.getOption("spark.files") | ||
.map(_.split(",")) | ||
.getOrElse(Array.empty[String]) | ||
val pySparkFiles: Array[String] = if (isPython) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm starting to wonder if we want our arguments to be similar to CLI arguments. We can then adjust For example, we could expect our main method to have arguments like this:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically we can reformat There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ifilonenko thoughts on this? |
||
appArgs(0) match { | ||
case null => Array(mainAppResource) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. keying in on a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted |
||
case _ => mainAppResource +: appArgs(0).split(",") | ||
}} else {Array.empty[String]} | ||
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") | ||
// The resource name prefix is derived from the application name, making it easy to connect the | ||
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the | ||
|
@@ -302,12 +326,17 @@ private[spark] object Client { | |
val namespace = sparkConf.get(KUBERNETES_NAMESPACE) | ||
val master = resolveK8sMaster(sparkConf.get("spark.master")) | ||
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) | ||
// No reason to distribute python files that are locally baked into Docker image | ||
def filterByFile(pFiles: Array[String]) : Array[String] = { | ||
val LocalPattern = "(local://)(.*)" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, if there are other patterns, let me know |
||
pFiles.filter(fi => !(fi matches LocalPattern)) | ||
} | ||
val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl( | ||
sparkConf, | ||
kubernetesResourceNamePrefix, | ||
namespace, | ||
sparkJars, | ||
sparkFiles, | ||
sparkFiles ++ filterByFile(pySparkFiles), | ||
sslOptionsProvider.getSslOptions) | ||
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( | ||
master, | ||
|
@@ -328,11 +357,14 @@ private[spark] object Client { | |
appName, | ||
kubernetesResourceNamePrefix, | ||
kubernetesAppId, | ||
mainAppResource, | ||
isPython, | ||
mainClass, | ||
sparkConf, | ||
appArgs, | ||
sparkJars, | ||
sparkFiles, | ||
pySparkFiles.toList, | ||
waitForAppCompletion, | ||
kubernetesClient, | ||
initContainerComponentsProvider, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.deploy.kubernetes.submit | ||
|
||
import io.fabric8.kubernetes.api.model.{Container, PodBuilder} | ||
|
||
import org.apache.spark.deploy.kubernetes.constants._ | ||
|
||
/** | ||
* Trait that is responsible for providing full file-paths dynamically after | ||
* the filesDownloadPath has been defined. The file-names are then stored in the | ||
* environmental variables in the driver-pod. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Given this description, can we use ContainerLocalizedFileResolver? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
*/ | ||
private[spark] trait DriverPodKubernetesFileMounter { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the V2 file staging server, do we need special code for staging python files? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. They're just added to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there's a legitimate question of if we want these files to be deployed in the same location as where the |
||
def addPySparkFiles(mainAppResource: String, pythonFiles: List[String], | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This seems to be python specific, but it's trait name is not. Should the trait name include There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be a more general file mounter for rfiles as well. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you considered including this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Never mind - I misunderstood the name of the class and thought it was also uploading the files to the resource staging server as well. |
||
mainContainerName: String, originalPodSpec: PodBuilder) : PodBuilder | ||
} | ||
|
||
private[spark] class DriverPodKubernetesFileMounterImpl(filesDownloadPath: String) | ||
extends DriverPodKubernetesFileMounter { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like the idea of making this generic - we might want to put the submitted jars in here too. It's worth noting that for the next refactor pass. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the above case, I only load file:// into spark-files, but resolve paths and mount for the purpose of the docker image environment variables using this Trait. |
||
val LocalPattern = "(local://)(.*)".r | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We've been using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted |
||
val FilePattern = "(file:/)(.*)".r | ||
def getName(file: String, separatorChar: Char) : String = { | ||
val index: Int = file.lastIndexOf(separatorChar) | ||
file.substring(index + 1) | ||
} | ||
def fileLoc(file: String) : String = file match { | ||
case "" => "" | ||
case LocalPattern(_, file_name) => file_name | ||
case FilePattern(_, file_name) => filesDownloadPath + "/" + getName(file_name, '/') | ||
case _ => filesDownloadPath + "/" + getName(file, '/') | ||
} | ||
def pythonFileLocations(pFiles: List[String], mainAppResource: String) : String = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These operations are mostly shared with ContainerLocalizedFileResolver. Can we use that instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have refactored to do this |
||
def recFileLoc(file: List[String]): List[String] = file match { | ||
case Nil => List.empty[String] | ||
case a::b => a match { | ||
case _ if a==mainAppResource => recFileLoc(b) | ||
case _ => fileLoc(a) +: recFileLoc(b) | ||
} | ||
} | ||
recFileLoc(pFiles).mkString(",") | ||
} | ||
override def addPySparkFiles(mainAppResource: String, pythonFiles: List[String], | ||
mainContainerName: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indentation here - put each argument on its own line. See argument lists for the constructor of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. noted |
||
originalPodSpec: PodBuilder): PodBuilder = { | ||
originalPodSpec | ||
.editSpec() | ||
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We've been indenting these to make it easier to track where the objects begin and end. See for example |
||
.addNewEnv() | ||
.withName(ENV_PYSPARK_PRIMARY) | ||
.withValue(fileLoc(mainAppResource)) | ||
.endEnv() | ||
.addNewEnv() | ||
.withName(ENV_PYSPARK_FILES) | ||
.withValue(pythonFileLocations(pythonFiles, mainAppResource)) | ||
.endEnv() | ||
.endContainer() | ||
.endSpec() | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -301,11 +301,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { | |
APP_NAME, | ||
APP_RESOURCE_PREFIX, | ||
APP_ID, | ||
null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Noted. Modifying |
||
false, | ||
MAIN_CLASS, | ||
SPARK_CONF, | ||
APP_ARGS, | ||
SPARK_JARS, | ||
SPARK_FILES, | ||
null, | ||
true, | ||
kubernetesClient, | ||
initContainerComponentsProvider, | ||
|
@@ -386,11 +389,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { | |
APP_NAME, | ||
APP_RESOURCE_PREFIX, | ||
APP_ID, | ||
null, | ||
false, | ||
MAIN_CLASS, | ||
SPARK_CONF, | ||
APP_ARGS, | ||
SPARK_JARS, | ||
SPARK_FILES, | ||
null, | ||
false, | ||
kubernetesClient, | ||
initContainerComponentsProvider, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one or more | ||
# contributor license agreements. See the NOTICE file distributed with | ||
# this work for additional information regarding copyright ownership. | ||
# The ASF licenses this file to You under the Apache License, Version 2.0 | ||
# (the "License"); you may not use this file except in compliance with | ||
# the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# | ||
|
||
FROM spark-base | ||
|
||
# If this docker file is being used in the context of building your images from a Spark distribution, the docker build | ||
# command should be invoked from the top level directory of the Spark distribution. E.g.: | ||
# docker build -t spark-driver-py:latest -f dockerfiles/driver-py/Dockerfile . | ||
|
||
ADD examples /opt/spark/examples | ||
ADD python /opt/spark/python | ||
|
||
RUN apk add --no-cache python && \ | ||
python -m ensurepip && \ | ||
rm -r /usr/lib/python*/ensurepip && \ | ||
pip install --upgrade pip setuptools && \ | ||
rm -r /root/.cache | ||
# UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES | ||
# RUN apk add --update alpine-sdk python-dev | ||
# RUN pip install numpy | ||
|
||
ENV PYTHON_VERSION 2.7.13 | ||
ENV PYSPARK_PYTHON python | ||
ENV PYSPARK_DRIVER_PYTHON python | ||
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} | ||
|
||
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The classpath environment variables may mean different in Pyspark, but given that we might want to also be shipping jars for UDFs then this might still apply. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly, I was confused about whether this would be necessary. This is an extension to the question about whether spark.jars should be empty There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My understanding now is that we actually do need jars for SQL UDFs. cc @robert3005 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay I will account for that by allowing submission of spark.jars in Client.scala |
||
if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ | ||
if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ | ||
if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ | ||
if ! [ -z ${SPARK_MOUNTED_FILES_DIR} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ | ||
exec /sbin/tini -- ${JAVA_HOME}/bin/java $SPARK_DRIVER_JAVA_OPTS -cp $SPARK_CLASSPATH \ | ||
-Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY \ | ||
$SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could factor out
childArgs += args.primaryResource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val mainAppResource = args(0)
so it is necessary to point to the mainAppResource in Client.main()