-
Notifications
You must be signed in to change notification settings - Fork 240
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update PandasUDF doc #2089
Update PandasUDF doc #2089
Conversation
|
||
- **Share GPU with JVM**: Let the Python process share JVM GPU. The Python process could run on the | ||
same GPU with JVM. | ||
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with Spark executor JVM. Without this feature, some use case in PandasUDF(a.k.a an `independent` process) will likely to use other GPUs other than the one we want it to run on. e.g. user can launch a TensorFlow session inside Pandas UDF and the machine contains 8 GPUs. user launchs 8 Spark executors. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs it can detects which will definitly conflict with existing Spark executor JVM processes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with Spark executor JVM. Without this feature, some use case in PandasUDF(a.k.a an `independent` process) will likely to use other GPUs other than the one we want it to run on. e.g. user can launch a TensorFlow session inside Pandas UDF and the machine contains 8 GPUs. user launchs 8 Spark executors. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs it can detects which will definitly conflict with existing Spark executor JVM processes. | |
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with Spark executor JVM. Without this feature, in a non-isolated environment, some use cases with PandasUDF (a.k.a an `independent` process) can try to use GPUs other than the one we want it to run on. For example, the user could launch a TensorFlow session inside Pandas UDF and the machine contains 8 GPUs. Without this GPU sharing feature, TensorFlow will automatically use all 8 GPUs which will conflict with existing Spark executor JVM processes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
``` | ||
--conf spark.rapids.sql.exec.ArrowEvalPythonExec=true \ | ||
--conf spark.rapids.sql.exec.MapInPandasExec=false \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean to have these false? or are these false due to performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed and add a status table for them for better view.
--conf spark.rapids.sql.exec.WindowInPandasExec=true | ||
``` | ||
|
||
These configs are the switches for each type of PandasUDF execution plan. Some of theme are set to false by default due to not supported or performance issue. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/theme/them/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed as suggested.
--conf spark.rapids.python.memory.gpu.allocFraction=0.1 \ | ||
--conf spark.rapids.python.memory.gpu.maxAllocFraction= 0.2 \ | ||
``` | ||
Same to the [RMM pooling for JVM](../tuning-guide.md#pooled-memory), here the pooling serves the same way but for Python process. `half of the rest GPU memory` will be used by default if it is not specified. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be clear here, is this the python process will assume it can use half the memory of the GPU? I think we should mention the other spark.rapids.memory.gpu.allocFraction setting here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
``` | ||
--conf spark.rapids.python.concurrentPythonWorkers=2 \ | ||
``` | ||
This parameter aims to limit the total concurrent running `Python process` in 1 Spark executor. This parameter is set to 0 by default which means there's not limit for concurrent Python workers. Note that for certain cases, setting this value too small may result a `hang` for your Spark job because a PandasUDF may produces multiple python process and each will try to acquire the python GPU process semaphore. This may bring a dead lock situation becasue a Spark job will not preceed until all its tasks are finished. For example, in the Pandas UDF stage, 2 tasks are running and each task launches 3 Python process and we set this parameter to 4. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little confused by this, how do I know how many python processes I need? On spark I usually get one per python process per task. Can we add more detail here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made wrong description here, sorry for that. Discussed with Liangcai today and re-state with more specific example.
Hi, @jlowe @tgravescs @firestarman I updated the doc, main modifications are:
The 3) are most difficult to explain as it relies on the implementation a lot, please help check if that part is clear for readers. |
|----------------------|----------|--------| | ||
|ArrowEvalPythonExec|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported| | ||
|MapInPandasExec| [Map](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#map)| supported| | ||
| WindowInPandasExec | [Window](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-scalar)| supported| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit looks like we have extra spaces in a couple of these, we may also switch that column to be second and have use case third
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switched.
Accelerator has a 1-1 mapping support for each of them. Not all PandasUDF types are data-transfer | ||
accelerated at present: | ||
|
||
| Spark Execution Plan | Use Case | Status | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we change Status to be data transfer accelerated or accelerated just to be more clear
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use data transfer accelerated.
![Python concurrent worker](/docs/img/concurrentPythonWorker.PNG) | ||
|
||
In this case, each PandasUDF will launch a Python process. At this moment two python process | ||
in each task acquired their semaphore but neither of them are able to proceed becasue both |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because is misspelled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a small update but otherwise lgtm.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, aside from the requested changes.
Add one more example for |
LGTM |
1. Make sure GPU exclusive mode is disabled. Note that this will not work if you are using exclusive | ||
mode to assign GPUs under spark. | ||
2. Currently the python files are packed into the spark rapids plugin jar. | ||
1. Make sure GPU `exclusive` mode is disabled. Note that this will not work if you are using exclusive |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For On-prem YARN instructions, we add a line "refer to nvidia-smi documentation".
Do we need to add the link or sample like this?
nvidia-smi -i 0 -c EXCLUSIVE_PROCESS # Set GPU 0 to exclusive mode, run as root.
Also, if this mandatory for YARN, should we say that more explicitly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for review!
Added a line to show the command to set GPU to Default mode.
I think no need to say if it's for yarn or for other environment. The mode here is only for GPU hardware. If the GPU is in Default mode, it will always be default mode no matter on Yarn or on Standalone or Local.
same GPU with JVM. | ||
- **GPU Assignment(Scheduling) in Python Process**: Let the Python process share the same GPU with | ||
Spark executor JVM. Without this feature, in a non-isolated environment, some use cases with | ||
PandasUDF (a.k.a an `independent` process) can try to use GPUs other than the one we want it to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PandasUDF (a.k.a an `independent` process) can try to use GPUs other than the one we want it to | |
Pandas UDF (an `independent` python daemon process) can try to use GPUs other than the one we want it to |
@@ -174,31 +178,102 @@ To enable _GPU Scheduling for Pandas UDF_, you need to configure your spark job | |||
--py-files ${SPARK_RAPIDS_PLUGIN_JAR} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also change the standalone part's rapids-4-spark_2.12-0.5.0-SNAPSHOT.jar
to ${SPARK_RAPIDS_PLUGIN_JAR}
?
``` | ||
|
||
Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently. | ||
You could choose the exec you need to enable. | ||
Please note: every type of PandasUDF on Spark is run by a specific Spark execution plan. RAPIDS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please note: every type of PandasUDF on Spark is run by a specific Spark execution plan. RAPIDS | |
Please note: every type of Pandas UDF on Spark is run by a specific Spark execution plan. RAPIDS |
Please note the data transfer acceleration only supports scalar UDF and Scalar iterator UDF currently. | ||
You could choose the exec you need to enable. | ||
Please note: every type of PandasUDF on Spark is run by a specific Spark execution plan. RAPIDS | ||
Accelerator has a 1-1 mapping support for each of them. Not all PandasUDF types are data-transfer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accelerator has a 1-1 mapping support for each of them. Not all PandasUDF types are data-transfer | |
Accelerator has a 1-1 mapping support for each of them. Not all Pandas UDF types are data-transfer |
|
||
| Spark Execution Plan|Data Transfer Accelerated|Use Case| | ||
|----------------------|----------|--------| | ||
|ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported| | |
|ArrowEvalPythonExec|yes|[Series to Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#series-to-series), [Iterator of Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-series-to-iterator-of-series) and [Iterator of Multiple Series to Iterator of Series](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html#iterator-of-multiple-series-to-iterator-of-series)| supported| |
deadlock situation because a Spark job will not proceed until all its tasks are finished. | ||
|
||
For example, in a specific Spark Stage that contais 3 PandasUDFs, 2 Spark tasks are running and | ||
each task launches 3 Python process while we set this `concurrentPythonWorkers` to 4. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
each task launches 3 Python process while we set this `concurrentPythonWorkers` to 4. | |
each task launches 3 Python processes while we set this `spark.rapids.python.concurrentPythonWorkers` to 4. |
|
||
![Python concurrent worker](../img/concurrentPythonWorker.PNG) | ||
|
||
In this case, each PandasUDF will launch a Python process. At this moment two Python process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, each PandasUDF will launch a Python process. At this moment two Python process | |
In this case, each Pandas UDF will launch a Python process. At this moment two Python processes |
in each task(in light green) acquired their semaphore but neither of them are able to proceed | ||
because both of them are waiting for their third semaphore to start the task. | ||
|
||
Another example is to use ArrowEvalPythonExec, with the following code: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another example is to use ArrowEvalPythonExec, with the following code: | |
Another example is to use `ArrowEvalPythonExec` with the following code: |
+- GpuArrowEvalPython | ||
``` | ||
This means each Spark task will trigger 2 Python processes. In this case, if we set | ||
`concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running | |
`spark.rapids.python.concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running |
``` | ||
This means each Spark task will trigger 2 Python processes. In this case, if we set | ||
`concurrentPythonWorkers=2`, it will also probably result in a hang as we allow 2 tasks running | ||
and each of them has 2 Python processes. Let's say Task_1_Process_1 and Task_2_Process_1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and each of them has 2 Python processes. Let's say Task_1_Process_1 and Task_2_Process_1 | |
and each of them spawns 2 Python processes. Let's say Task_1_Process_1 and Task_2_Process_1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for review! Updated.
Update Pandas UDF doc with more details description Signed-off-by: Allen Xu <wjxiz1992@gmail.com>
7cf1787
58f6911
to
7cf1787
Compare
build |
@firestarman @wjxiz1992 Just found a possible typo here |
Yes it should be |
* Update PandasUDF doc Update Pandas UDF doc with more details description Signed-off-by: Allen Xu <wjxiz1992@gmail.com> * Resolve comments * More doc clean * doc clean and table reformat * doc clean * doc clean * Doc update * resolve comments * Resolve comments * Resolve comments * resolve comments * resolve comments * Resolve comments
* Update PandasUDF doc Update Pandas UDF doc with more details description Signed-off-by: Allen Xu <wjxiz1992@gmail.com> * Resolve comments * More doc clean * doc clean and table reformat * doc clean * doc clean * Doc update * resolve comments * Resolve comments * Resolve comments * resolve comments * resolve comments * Resolve comments
Fixes #2053.
Update Pandas UDF doc with more details description.
Signed-off-by: Allen Xu wjxiz1992@gmail.com