Skip to content
This repository has been archived by the owner on Jan 9, 2020. It is now read-only.

Python Bindings for launching PySpark Jobs from the JVM (v1) #351

Conversation

ifilonenko
Copy link
Member

@ifilonenko ifilonenko commented Jun 16, 2017

What changes were proposed in this pull request?

The changes that were proposed in the pull request are the following:

  1. Using separate docker images, built on-top of spark-base, for PySpark jobs.
    These images differ with the inclusion of python and pyspark specific environment variables. The user-entry point also differs for driver-py as you must include the location of the primary PySpark file and distributed py-files in addition to driver args.
  2. New FileMountingTrait that is generic enough for both SparkR and PySpark to handle passing in the proper arguments for PythonRunner and RRunner. This FileMounter uses the filesDownloadPath resolved in the DriverInitComponent to ensure that file paths are correct. These file-paths are stored as environmental variables that are mounted on the driver pod.
  3. Inclusion of integration tests for PySpark (TODO: Build environment identical to distribution python environment to run the tests)
  4. Match statements to account for varying arguments and malformed inputs which may include nulls or a mix of local:// and file:// file-types.

Example Spark Submit

This is an example spark-submit that uses the custom pyspark docker images and distributes the staged sort.py file across the cluster. The entry point for the driver is:
org.apache.spark.deploy.PythonRunner <FILE_DOWNLOADS_PATH>/pi.py <FILE_DOWNLOADS_PATH>/sort.py 100

bin/spark-submit \
  --deploy-mode cluster \
  --master k8s://<k8s-api-url> \
  --kubernetes-namespace default \
  --conf spark.executor.memory=500m \
  --conf spark.driver.memory=1G \
  --conf spark.driver.cores=1 \
  --conf spark.executor.cores=1 \
  --conf spark.executor.instances=1 \
  --conf spark.app.name=spark-pi \
  --conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
  --conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
  --conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
  --conf spark.kubernetes.resourceStagingServer.uri=http://192.168.99.100:31000 \
  --py-files examples/src/main/python/sort.py \
  examples/src/main/python/pi.py 100

How was this patch tested?

This was fully tested by building a make_distribution environment and running on a local minikube cluster with a single executor. The following command is an example submission:

$ build/mvn install -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
$ build/mvn compile -T 4C -Pkubernetes -pl resource-managers/kubernetes/core -am -DskipTests
$ dev/make-distribution.sh --pip --tgz -Phadoop-2.7 -Pkubernetes
$ tar -xvf spark-2.1.0-k8s-0.2.0-SNAPSHOT-bin-2.7.3.tgz
$ cd spark-2.1.0-k8s-0.2.0-SNAPSHOT-bin-2.7.3
$ minikube start --insecure-registry=localhost:5000 --cpus 8 --disk-size 20g --memory 8000 --kubernetes-version v1.5.3; eval $(minikube docker-env)
$ # Build all docker images using docker build ....
$ # Make sure staging server is up 
$ kubectl cluster-info
Kubernetes master is running at https://192.168.99.100:8443
KubeDNS is running at https://192.168.99.100:8443/api/v1/proxy/namespaces/kube-system/services/kube-dns
kubernetes-dashboard is running at https://192.168.99.100:8443/api/v1/proxy/namespaces/kube-system/services/kubernetes-dashboard
$ docker images
REPOSITORY                                          
spark-integration-test-asset-server                 
spark-init                                           
spark-resource-staging-server                         
spark-shuffle                                      
spark-executor-py                                    
spark-executor                                      
spark-driver-py                                      
spark-driver                                        
spark-base                                         
$ bin/spark-submit \
  --deploy-mode cluster \
  --master k8s://https://192.168.99.100:8443 \
  --kubernetes-namespace default \
  --conf spark.executor.memory=500m \
  --conf spark.driver.memory=1G \
  --conf spark.driver.cores=1 \
  --conf spark.executor.cores=1 \
  --conf spark.executor.instances=1 \
  --conf spark.app.name=spark-pi \
  --conf spark.kubernetes.driver.docker.image=spark-driver-py:latest \
  --conf spark.kubernetes.executor.docker.image=spark-executor-py:latest \
  --conf spark.kubernetes.initcontainer.docker.image=spark-init:latest \
  --conf spark.kubernetes.resourceStagingServer.uri=http://192.168.99.100:31000 \
  --py-files examples/src/main/python/sort.py \
  local:///opt/spark/examples/src/main/python/pi.py 100

Integration and Unit tests have been added.

Future Versions of this PR

Launching JVM from Python (log issue)
MemoryOverhead testing (OOMKilled errors)

def run(
sparkConf: SparkConf,
mainAppResource: String,
mainClass: String,
appArgs: Array[String]): Unit = {
val sparkJars = sparkConf.getOption("spark.jars")
val isPython = mainAppResource.endsWith(".py")
val sparkJars = if (isPython) Array.empty[String] else {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be the case that people upload spark.jars when running a PySpark job?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It could happen for SQL UDFs, perhaps?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think staging jars is a possible use case, although I have never done it. Seems OK to allow jar list to be set non-empty for now

@mccheah
Copy link

mccheah commented Jun 16, 2017

(y) This is on my radar to look at - thanks a lot for submitting this.

</resource>
</resources>
</configuration>
</execution>
Copy link
Member Author

@ifilonenko ifilonenko Jun 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part is what is incomplete for making fully fledged integration tests. It seems that we need to mimic the environment created by the make_distribution.sh environment, done with the following script:

# Make pip package
if [ "$MAKE_PIP" == "true" ]; then
  echo "Building python distribution package"
  pushd "$SPARK_HOME/python" > /dev/null
  python setup.py sdist
  popd > /dev/null
else
  echo "Skipping building python distribution package"
fi

A similar environment will need to be mimicked for testing R bindings. Any recommendations would be great :)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do the Pyspark tests do?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Modify the mainClass to PythonRunner and pass in custom arguments to test on a locally baked PySpark test file. The file in the test environment is pi.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further tests I wish to add include calling other python files, but I am bottle-necked by the environment atm.

@erikerlandson
Copy link
Member

IIRC, you had concerns about additional container size - but should we consider folding the python-specific images into the standard spark images, to avoid the need for specifying special images? What is the size impact?

@ifilonenko
Copy link
Member Author

@erikerlandson That could be something to look into. Python environment that I am loading in doubles the size of the driver image to 573 MB from the original 258 MB.

childArgs += args.primaryResource
childArgs += args.mainClass
if (args.isPython) {
childArgs += args.primaryResource
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could factor out childArgs += args.primaryResource

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val mainAppResource = args(0)
so it is necessary to point to the mainAppResource in Client.main()

childArgs += "org.apache.spark.deploy.PythonRunner"
childArgs += args.pyFiles
}
else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think scala-style expected to be } else {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted

@@ -83,7 +86,14 @@ private[spark] class Client(
def run(): Unit = {
validateNoDuplicateFileNames(sparkJars)
validateNoDuplicateFileNames(sparkFiles)

if (isPython) {validateNoDuplicateFileNames(pySparkFiles)}
val arguments = if (isPython) pySparkFiles match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wonder if this could be factored to scale out cleaner. For example, if we add R next, is that going to be a new 3rd layer of arguments?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see --r-files as a submission type so it is something to look into.

def run(
sparkConf: SparkConf,
mainAppResource: String,
mainClass: String,
appArgs: Array[String]): Unit = {
val sparkJars = sparkConf.getOption("spark.jars")
val isPython = mainAppResource.endsWith(".py")
val sparkJars = if (isPython) Array.empty[String] else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think staging jars is a possible use case, although I have never done it. Seems OK to allow jar list to be set non-empty for now

val launchTime = System.currentTimeMillis
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val pySparkFiles: Array[String] = if (isPython) {
appArgs(0) match {
case null => Array(mainAppResource)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keying in on a null return is a scala no-no. Recommend if (appArgs.isEmpty) ... or some other test

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted

@@ -302,12 +326,17 @@ private[spark] object Client {
val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
val master = resolveK8sMaster(sparkConf.get("spark.master"))
val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf)
// No reason to distribute python files that are locally baked into Docker image
def filterByFile(pFiles: Array[String]) : Array[String] = {
val LocalPattern = "(local://)(.*)"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is local:// purely reserved for things that would already be on the image?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, if there are other patterns, let me know

* environmental variables in the driver-pod.
*/
private[spark] trait DriverPodKubernetesFileMounter {
def addPySparkFiles(mainAppResource: String, pythonFiles: List[String],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be python specific, but it's trait name is not. Should the trait name include ...Python... somewhere? Or, should this be folded into more general file mounter?

Copy link
Member Author

@ifilonenko ifilonenko Jun 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be a more general file mounter for rfiles as well.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you considered including this in SubmittedDependencyUploader or re-using that somehow?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind - I misunderstood the name of the class and thought it was also uploading the files to the resource staging server as well.

@@ -301,11 +301,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
APP_NAME,
APP_RESOURCE_PREFIX,
APP_ID,
null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Option[T] for things that may or may not be set.

Copy link
Member Author

@ifilonenko ifilonenko Jun 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted. Modifying nulls to Nil and Empty String where appropriate

* the filesDownloadPath has been defined. The file-names are then stored in the
* environmental variables in the driver-pod.
*/
private[spark] trait DriverPodKubernetesFileMounter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the V2 file staging server, do we need special code for staging python files?
cc/ @mccheah

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're just added to spark.files and that should suffice.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a legitimate question of if we want these files to be deployed in the same location as where the spark.files is deployed. I think this is fine for now but we have split the jars out so it seems strange that we're splitting in one instance but bundling in the same directory in this case.

@foxish
Copy link
Member

foxish commented Jun 16, 2017

Really excited to see this. Thanks @ifilonenko!

@kimoonkim
Copy link
Member

Yes, an awesome contribution! @ifilonenko


private[spark] class DriverPodKubernetesFileMounterImpl(filesDownloadPath: String)
extends DriverPodKubernetesFileMounter {
val LocalPattern = "(local://)(.*)".r
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been using Utils.resolveURI(uri).getScheme match - see KubernetesFileUtils.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Main change I would like to see is to try to merge the container file resolution logic with ContainerLocalizedFilesResolver.

.map(_.split(","))
.getOrElse(Array.empty[String]) ++
Option(mainAppResource)
.filterNot(_ == SparkLauncher.NO_RESOURCE)
.toSeq
.toSeq }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

} should go on the next line

val launchTime = System.currentTimeMillis
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val pySparkFiles: Array[String] = if (isPython) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm starting to wonder if we want our arguments to be similar to CLI arguments. We can then adjust SparkSubmit.scala to conform to the contract here.

For example, we could expect our main method to have arguments like this:

org.apache.spark.deploy.kubernetes.Client --primary-resource <resource> --py-files <pyfiles>.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically we can reformat Client.main's contract to expect named arguments, make SparkSubmit pass us named arguments, and parse them here accordingly.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ifilonenko thoughts on this?

/**
* Trait that is responsible for providing full file-paths dynamically after
* the filesDownloadPath has been defined. The file-names are then stored in the
* environmental variables in the driver-pod.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given this description, can we use ContainerLocalizedFileResolver?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileMounter is also a misleading name since this doesn't actually mount the files themselves, but rather it resolves the paths to them.

* environmental variables in the driver-pod.
*/
private[spark] trait DriverPodKubernetesFileMounter {
def addPySparkFiles(mainAppResource: String, pythonFiles: List[String],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind - I misunderstood the name of the class and thought it was also uploading the files to the resource staging server as well.

@@ -301,11 +301,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
APP_NAME,
APP_RESOURCE_PREFIX,
APP_ID,
"",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add tests that specifically check the Python logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit tests

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update ClientV2Suite to ensure that we're getting an instance of the file mounter and using it to mount the PySpark files?

case FilePattern(_, file_name) => filesDownloadPath + "/" + getName(file_name, '/')
case _ => filesDownloadPath + "/" + getName(file, '/')
}
def pythonFileLocations(pFiles: List[String], mainAppResource: String) : String = {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These operations are mostly shared with ContainerLocalizedFileResolver. Can we use that instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have refactored to do this

recFileLoc(pFiles).mkString(",")
}
override def addPySparkFiles(mainAppResource: String, pythonFiles: List[String],
mainContainerName: String,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation here - put each argument on its own line. See argument lists for the constructor of DriverInitContainerComponentsProviderImpl.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted

originalPodSpec: PodBuilder): PodBuilder = {
originalPodSpec
.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(mainContainerName))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've been indenting these to make it easier to track where the objects begin and end. See for example SparkPodInitContainerBootstrapImpl.

ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}

CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The classpath environment variables may mean different in Pyspark, but given that we might want to also be shipping jars for UDFs then this might still apply.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, I was confused about whether this would be necessary. This is an extension to the question about whether spark.jars should be empty

Copy link

@mccheah mccheah Jun 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding now is that we actually do need jars for SQL UDFs. cc @robert3005

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I will account for that by allowing submission of spark.jars in Client.scala

val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
val SPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
".integrationtest.jobs.SparkPiWithInfiniteWait"
val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION = "local:///opt/spark/" +
"examples/src/main/python/pi.py"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the entire string to this line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noted

@ifilonenko
Copy link
Member Author

rerun integration test please

Copy link

@mccheah mccheah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep in mind that I'm glossing over some of the details of the submission client design because as discussed in the SIG meeting this morning, it probably needs to be refactored and restructured anyways.

val exitCode = process.waitFor()
if (exitCode != 0) {
// scalastyle:off println
println(s"exitCode: $exitCode")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use logging here also.

.getOrElse("/usr/bin/python")
val builder = new ProcessBuilder(
Seq(pythonExec, "setup.py", "sdist").asJava)
builder.directory(new java.io.File(s"$DOCKER_BUILD_PATH/python"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use new File(DOCKER_BUILD_PATH.toFile(), "python").

test("Run PySpark Job on file from CONTAINER with spark.jar defined") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A more interesting test could be to put a jar on the classpath that contains a UDF that the job needs. That could be left for follow-up work though.


launchStagingServer(SSLOptions(), None)
sparkConf.set(DRIVER_DOCKER_IMAGE,
System.getProperty("spark.docker.test.driverImage", "spark-driver-py:latest"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Chain the .set calls together.

val launchTime = System.currentTimeMillis
val sparkFiles = sparkConf.getOption("spark.files")
.map(_.split(","))
.getOrElse(Array.empty[String])
val pySparkFiles: Array[String] = if (isPython) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ifilonenko thoughts on this?

.map(_.split(","))
.getOrElse(Array.empty[String]) ++
Option(mainAppResource)
val isPython = mainAppResource.endsWith(".py")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alluding to a remark from earlier - we might want to treat these arguments differently. For example we could take a command line argument that is a "language mode" and expect SparkSubmit to give us the right language mode and handle accordingly - e.g. Scala, Python, R. We have control over the arguments that SparkSubmit.scala sends us and so we should encode the arguments clearly if we can.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. but R will also have MainResource be the Python file, but there are no --r-files. So arguments logic is only for Python. I think it is simple enough that refactoring the arguments, might not be necessary. Something to consider, but I agree with what you are saying

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm more wary of the fact that we're matching against Nil here when we could be doing this in a type-safe way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I match on NIl is because the appArgs being passed in are:
(null 500) from the spark-submit because --py-files are null. The first value will always be the --py-spark files. and the rest are the arguments passed into the file itself. I don't see problems with that exactly.

}

private[spark] class DriverPodKubernetesFileMounterImpl()
extends DriverPodKubernetesFileMounter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of making this generic - we might want to put the submitted jars in here too. It's worth noting that for the next refactor pass.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the above case, I only load file:// into spark-files, but resolve paths and mount for the purpose of the docker image environment variables using this Trait.

@@ -301,11 +301,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
APP_NAME,
APP_RESOURCE_PREFIX,
APP_ID,
"",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update ClientV2Suite to ensure that we're getting an instance of the file mounter and using it to mount the PySpark files?

* the filesDownloadPath has been defined. The file-names are then stored in the
* environmental variables in the driver-pod.
*/
private[spark] trait DriverPodKubernetesFileMounter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're just added to spark.files and that should suffice.

* the filesDownloadPath has been defined. The file-names are then stored in the
* environmental variables in the driver-pod.
*/
private[spark] trait DriverPodKubernetesFileMounter {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a legitimate question of if we want these files to be deployed in the same location as where the spark.files is deployed. I think this is fine for now but we have split the jars out so it seems strange that we're splitting in one instance but bundling in the same directory in this case.

@mccheah
Copy link

mccheah commented Jun 23, 2017

rerun integration tests please

@ifilonenko
Copy link
Member Author

rerun integration tests please

@ifilonenko
Copy link
Member Author

rerun unit tests please

override def answer(invocation: InvocationOnMock) : PodBuilder = {
invocation.getArgumentAt(3, classOf[PodBuilder])
.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to write out all of the specific Pyspark logic here. We should prefer making this part of the test as simple as possible, and only checking the specifics in the file mounter suite.

How about just having this answer:

override def answer(invocation: InvocationOnMock): PodBuilder = {
  invocation.getArgumentAt(3, classOf[PodBuilder]).editSpec().editMetadata().addToLabels("pyspark", "true").endMetadata();
}

Then just check that the pod has the given label.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. But why not test the specific functionality that the properly resolved file_names are mounted into the proper environmental variables? It seems to be the entire purpose of this unit test no?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be tested in the file mounter unit test. The Client class doesn't decide the environment variables to set - it trusts the submodule is providing the right configurations.

any[String],
any[PodBuilder])).thenAnswer( new Answer[Pod] {
override def answer(invocation: InvocationOnMock) : Pod = {
invocation.getArgumentAt(0, classOf[DriverInitContainerComponentsProvider])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the mock file mounter directly from here instead ot calling to the init container components provider.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was because PythonSubmissionResource wasn't a trait. That is a good point.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible to use mocks to stub classes, but it's certainly not preferred. Mocks also can't be used to stub final methods or final classes so if we're mocking classes we have to make an assumption that we aren't doing those things - best to avoid the uncertainty entirely.


import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}

class PythonSubmissionResources(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a trait and write a unit test. This will allow us to mock this out and simplify the ClientV2Suite.

private val mainAppResource: String,
private val appArgs: Array[String] ) {

private val pyFiles: Array[String] = Option(appArgs(0)) match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Option.getOrElse instead of match. Avoid matching with Options.


def pySparkFiles: Array[String] = pyFiles

def arguments: Array[String] =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always wrap multi-line methods with curly braces, even if it's just a single statement.

resolvedPySparkFiles: String,
driverContainerName: String,
driverPodBuilder: PodBuilder) : Pod = {
initContainerComponentsProvider
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm uncertain about the design where we having the init container components provider is used from here. The indirection is becoming tricky to follow. Shouldn't the init container components provider be creating this class and then the addPySparkFiles method is defined on this class?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though we still wanted to make this generic. I think after seeing this though that we can make this generic only later - it's probably simpler to put everything Python related into this class, including the file mounting / resolution.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point actually, I should pass in the FileMounter instead of the InitContainer, I will refactor for that.

}
validateNoDuplicateFileNames(sparkJars)
validateNoDuplicateFileNames(sparkFiles)
if (pythonResource.isDefined) {validateNoDuplicateFileNames(pySparkFiles)}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Option.forEach

jarsDownloadPath: String,
filesDownloadPath: String) extends ContainerLocalizedFilesResolver {
filesDownloadPath: String
) extends ContainerLocalizedFilesResolver {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this line up - notice the diff from what this had before.

}

override def resolvePrimaryResourceFile(): String = {
Option(primaryPyFile) match {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use Option.map. Never use match on Options:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not? Is that Spark specific scala practice?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not just spark-specific but seems to be the standard across all of Scala.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See https://www.scala-lang.org/api/current/scala/Option.html

"The most idiomatic way to use an scala.Option instance is to treat it as a collection or monad and use map,flatMap, filter, or foreach... A less-idiomatic way to use scala.Option values is via pattern matching"

def primarySparkResource (containerLocalizedFilesResolver: ContainerLocalizedFilesResolver)
: String = containerLocalizedFilesResolver.resolvePrimaryResourceFile()

def driverPod(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a better name here - try for something that indicates this is Pyspark specific.

@@ -169,30 +189,85 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter {
.endMetadata()
}
})
when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver())
when(initContainerComponentsProvider.provideContainerLocalizedFilesResolver(any[String]))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't match on any here - since it's a simple type (String) we should be able to capture the specific argument we're looking for.

@ifilonenko
Copy link
Member Author

@erikerlandson @mccheah PTAL

containerLocalizedFilesResolver: ContainerLocalizedFilesResolver) : String =
containerLocalizedFilesResolver.resolvePrimaryResourceFile()

override def driverPod(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this method?

.editSpec()
.editMatchingContainer(new ContainerNameEqualityPredicate(
invocation.getArgumentAt(2, classOf[String])))
.addNewEnv()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still too complex - just add to the labels or the annotations in the metadata.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the test is checking if the matching Container has it's environmental variables changed as a result of the function. Which is what I am doing here.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Client is unaware that the underlying implementation is specifically setting environment variables - so if the Client is unaware, the Client's test should be agnostic to that as well. All the test cares about is "Did the client use the files mounter to alter the driver pod in some way?".

private val SPARK_JARS = Seq.empty[String]
private val JARS_DOWNLOAD_PATH = "/var/data/spark-jars"
private val FILES_DOWNLOAD_PATH = "/var/data/spark-files"
private val localizedFilesResolver = new ContainerLocalizedFilesResolverImpl(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use the impl - use a mock object.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally a given test should only use the concrete implementation for the class that is under test. All other logical units should be mocks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted, addressed in most recent commit

.withNewSpec()
.addToContainers(driverContainer)
.endSpec()
private val driverFileMounter = new DriverInitContainerComponentsProviderImpl(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use the impl - use a mock object.

@ifilonenko
Copy link
Member Author

rerun unit tests please

1 similar comment
@ifilonenko
Copy link
Member Author

rerun unit tests please

private val appArgs: Array[String] ) extends PythonSubmissionResources {

private val pyFiles: Array[String] = {
(Option(appArgs(0)) map (a => mainAppResource +: a.split(",")))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put dots between (Option(appArgs(0)) and map. Space delimiting for methods is discouraged in general and I think there are a few other places where this is done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be changed in Client.scala as well or just here

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change it everywhere

@ifilonenko
Copy link
Member Author

ifilonenko commented Jun 30, 2017

PR moved to #364

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants