-
Notifications
You must be signed in to change notification settings - Fork 118
Python Bindings for launching PySpark Jobs from the JVM (v1) #351
Changes from 18 commits
d3cf58f
bafc13c
59d9f0a
4daf634
51105ca
bd30f40
720776e
4b5f470
1361a26
0abc3b1
0869b07
38d48ce
9bf7b9d
4561194
2cf96cc
eb1079a
4a6b779
363919a
9c7adb1
0388aa4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -47,11 +47,11 @@ private[spark] class Client( | |
appName: String, | ||
kubernetesResourceNamePrefix: String, | ||
kubernetesAppId: String, | ||
mainAppResource: String, | ||
pythonResource: Option[PythonSubmissionResourcesImpl], | ||
mainClass: String, | ||
sparkConf: SparkConf, | ||
appArgs: Array[String], | ||
sparkJars: Seq[String], | ||
sparkFiles: Seq[String], | ||
waitForAppCompletion: Boolean, | ||
kubernetesClient: KubernetesClient, | ||
initContainerComponentsProvider: DriverInitContainerComponentsProvider, | ||
|
@@ -82,9 +82,7 @@ private[spark] class Client( | |
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) | ||
|
||
def run(): Unit = { | ||
validateNoDuplicateFileNames(sparkJars) | ||
validateNoDuplicateFileNames(sparkFiles) | ||
|
||
val arguments = (pythonResource map {p => p.arguments}).getOrElse(appArgs) | ||
val driverCustomLabels = ConfigurationUtils.combinePrefixedKeyValuePairsWithDeprecatedConf( | ||
sparkConf, | ||
KUBERNETES_DRIVER_LABEL_PREFIX, | ||
|
@@ -136,7 +134,7 @@ private[spark] class Client( | |
.endEnv() | ||
.addNewEnv() | ||
.withName(ENV_DRIVER_ARGS) | ||
.withValue(appArgs.mkString(" ")) | ||
.withValue(arguments.mkString(" ")) | ||
.endEnv() | ||
.withNewResources() | ||
.addToRequests("cpu", driverCpuQuantity) | ||
|
@@ -182,10 +180,13 @@ private[spark] class Client( | |
.map(_.build()) | ||
|
||
val containerLocalizedFilesResolver = initContainerComponentsProvider | ||
.provideContainerLocalizedFilesResolver() | ||
.provideContainerLocalizedFilesResolver(mainAppResource) | ||
val resolvedSparkJars = containerLocalizedFilesResolver.resolveSubmittedSparkJars() | ||
val resolvedSparkFiles = containerLocalizedFilesResolver.resolveSubmittedSparkFiles() | ||
|
||
val resolvedPySparkFiles = containerLocalizedFilesResolver.resolveSubmittedPySparkFiles() | ||
val resolvedPrimaryPySparkResource = (pythonResource map { | ||
p => p.primaryPySparkResource(containerLocalizedFilesResolver) | ||
}).getOrElse("") | ||
val initContainerBundler = initContainerComponentsProvider | ||
.provideInitContainerBundle(maybeSubmittedResourceIdentifiers.map(_.ids()), | ||
resolvedSparkJars ++ resolvedSparkFiles) | ||
|
@@ -221,7 +222,7 @@ private[spark] class Client( | |
val resolvedDriverJavaOpts = resolvedSparkConf.getAll.map { | ||
case (confKey, confValue) => s"-D$confKey=$confValue" | ||
}.mkString(" ") + driverJavaOptions.map(" " + _).getOrElse("") | ||
val resolvedDriverPod = podWithInitContainerAndMountedCreds.editSpec() | ||
val resolvedDriverPodBuilder = podWithInitContainerAndMountedCreds.editSpec() | ||
.editMatchingContainer(new ContainerNameEqualityPredicate(driverContainer.getName)) | ||
.addNewEnv() | ||
.withName(ENV_MOUNTED_CLASSPATH) | ||
|
@@ -233,7 +234,15 @@ private[spark] class Client( | |
.endEnv() | ||
.endContainer() | ||
.endSpec() | ||
.build() | ||
val driverPodFileMounter = initContainerComponentsProvider.provideDriverPodFileMounter() | ||
val resolvedDriverPod = (pythonResource map { | ||
p => p.driverPod( | ||
driverPodFileMounter, | ||
resolvedPrimaryPySparkResource, | ||
resolvedPySparkFiles.mkString(","), | ||
driverContainer.getName, | ||
resolvedDriverPodBuilder | ||
)}).getOrElse(resolvedDriverPodBuilder.build()) | ||
Utils.tryWithResource( | ||
kubernetesClient | ||
.pods() | ||
|
@@ -271,17 +280,6 @@ private[spark] class Client( | |
} | ||
} | ||
} | ||
|
||
private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = { | ||
val fileNamesToUris = allFiles.map { file => | ||
(new File(Utils.resolveURI(file).getPath).getName, file) | ||
} | ||
fileNamesToUris.groupBy(_._1).foreach { | ||
case (fileName, urisWithFileName) => | ||
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" + | ||
s" file name $fileName is shared by all of these URIs: $urisWithFileName") | ||
} | ||
} | ||
} | ||
|
||
private[spark] object Client { | ||
|
@@ -292,22 +290,34 @@ private[spark] object Client { | |
val appArgs = args.drop(2) | ||
run(sparkConf, mainAppResource, mainClass, appArgs) | ||
} | ||
|
||
def run( | ||
sparkConf: SparkConf, | ||
mainAppResource: String, | ||
mainClass: String, | ||
appArgs: Array[String]): Unit = { | ||
val isPython = mainAppResource.endsWith(".py") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alluding to a remark from earlier - we might want to treat these arguments differently. For example we could take a command line argument that is a "language mode" and expect SparkSubmit to give us the right language mode and handle accordingly - e.g. Scala, Python, R. We have control over the arguments that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. True. but R will also have MainResource be the Python file, but there are no --r-files. So arguments logic is only for Python. I think it is simple enough that refactoring the arguments, might not be necessary. Something to consider, but I agree with what you are saying There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm more wary of the fact that we're matching against There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason I match on NIl is because the appArgs being passed in are: |
||
val pythonResource: Option[PythonSubmissionResourcesImpl] = | ||
if (isPython) { | ||
Option(new PythonSubmissionResourcesImpl(mainAppResource, appArgs)) | ||
} else None | ||
// Since you might need jars for SQL UDFs in PySpark | ||
def sparkJarFilter() : Seq[String] = (pythonResource map { | ||
p => p.sparkJars}).getOrElse( | ||
Option(mainAppResource) | ||
.filterNot(_ == SparkLauncher.NO_RESOURCE) | ||
.toSeq) | ||
val sparkJars = sparkConf.getOption("spark.jars") | ||
.map(_.split(",")) | ||
.getOrElse(Array.empty[String]) ++ | ||
Option(mainAppResource) | ||
.filterNot(_ == SparkLauncher.NO_RESOURCE) | ||
.toSeq | ||
.getOrElse(Array.empty[String]) ++ sparkJarFilter() | ||
val launchTime = System.currentTimeMillis | ||
val sparkFiles = sparkConf.getOption("spark.files") | ||
.map(_.split(",")) | ||
.getOrElse(Array.empty[String]) | ||
val pySparkFilesOption = pythonResource map {p => p.pySparkFiles} | ||
validateNoDuplicateFileNames(sparkJars) | ||
validateNoDuplicateFileNames(sparkFiles) | ||
pySparkFilesOption foreach {b => validateNoDuplicateFileNames(b)} | ||
val pySparkFiles = pySparkFilesOption.getOrElse(Array.empty[String]) | ||
val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") | ||
// The resource name prefix is derived from the application name, making it easy to connect the | ||
// names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the | ||
|
@@ -326,6 +336,7 @@ private[spark] object Client { | |
namespace, | ||
sparkJars, | ||
sparkFiles, | ||
pySparkFiles, | ||
sslOptionsProvider.getSslOptions) | ||
Utils.tryWithResource(SparkKubernetesClientFactory.createKubernetesClient( | ||
master, | ||
|
@@ -346,16 +357,26 @@ private[spark] object Client { | |
appName, | ||
kubernetesResourceNamePrefix, | ||
kubernetesAppId, | ||
mainAppResource, | ||
pythonResource, | ||
mainClass, | ||
sparkConf, | ||
appArgs, | ||
sparkJars, | ||
sparkFiles, | ||
waitForAppCompletion, | ||
kubernetesClient, | ||
initContainerComponentsProvider, | ||
kubernetesCredentialsMounterProvider, | ||
loggingPodStatusWatcher).run() | ||
} | ||
} | ||
private def validateNoDuplicateFileNames(allFiles: Seq[String]): Unit = { | ||
val fileNamesToUris = allFiles.map { file => | ||
(new File(Utils.resolveURI(file).getPath).getName, file) | ||
} | ||
fileNamesToUris.groupBy(_._1).foreach { | ||
case (fileName, urisWithFileName) => | ||
require(urisWithFileName.size == 1, "Cannot add multiple files with the same name, but" + | ||
s" file name $fileName is shared by all of these URIs: $urisWithFileName") | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could factor out
childArgs += args.primaryResource
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val mainAppResource = args(0)
so it is necessary to point to the mainAppResource in Client.main()