-
-
Notifications
You must be signed in to change notification settings - Fork 8.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pyspark] Support stage-level scheduling for training #9519
Conversation
@WeichenXu123 @trivialfis @eordentlich please help to review, Thx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the excellent feature! You might need to update the document as well.
python-package/xgboost/spark/core.py
Outdated
# Each training task requires cpu cores > total executor cores//2 + 1 which can | ||
# make the tasks be sent to different executors. | ||
# | ||
# Please note that we can't set task_cpu_cores to the value which is smaller than | ||
# total executor cores/2 because only task_gpu_amount can't make sure the tasks be | ||
# sent to different executor even task_gpus=1.0 | ||
|
||
# Spark-rapids is a project to leverage GPUs to accelerate spark SQL and it can | ||
# really help the performance. If spark-rapids is enabled. we don't allow other | ||
# ETL gpu tasks running alongside training tasks to avoid OOM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused by this comment. Let me try to rephrase it and see if I got it right.
To send the tasks to different executors, the task must require at least `total executor cores // 2 + 1` CPU cores.
Without configuring the `task_cpu_cores` to a larger value, we can't make sure tasks are sent to different executors even if `task_gpu_amount` is set to 1.
When the `spark-rapids` plugin is enabled for SQL acceleration, we need to prevent other GPU-based ETL tasks from running during training to avoid OOM errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please update the comments with a cleaner explanation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
python-package/xgboost/spark/core.py
Outdated
|
||
# task_gpus means how many gpu slots the task requires in a single GPU, | ||
# it doesn't mean how many gpu shares it would like to require, so we | ||
# can set it to any value of (0, 0.5] or 1. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but not [0.5, 1)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, spark will throw exception if it's in [0.5, 1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bug in spark, I deleted the comment.
python-package/xgboost/spark/core.py
Outdated
# Each training task requires cpu cores > total executor cores//2 + 1 which can | ||
# make the tasks be sent to different executors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hacky, and it causes CPU cores wasted, assuming one xgboost train worker can only use one cpu core, but it makes spark allocate multiple CPU cores to the task, and these CPU resources cannot be used by other spark tasks until the xgboost train worker completes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's correct. It will waste some CPU cores. Let's assume executor_cores=12, then the training task will require 7, but there are still 5 CPU cores left for the other spark task.
The ideal way to limit task number should rely on task_gpu_amount, but the current spark implementation has not supported it. We can report an issue for Spark and once Spark fixes that issue, we can rework this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q:
make the tasks be sent to different executors.
Why we need this rule ? one executor can hold multiple spark tasks, any issues it causes ?, e.g., we can config one exeuctor owns 4 cpu cores, 4 GPU cores,
then in this exeuctor we can start 4 xgboost train workers, each xgboost worker is allocated with 1 cpu and 1 GPU exclusively
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR has bypassed this scenario that the executor.gpu.amount > 1, since it requires some special configuration to make it work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then I think we can directly set executor.gpu.amount=1 and executor.cores=1 when we start spark application ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@WeichenXu123, we can't use gpu resources to limit the task number. I had a design doc (supporting stage-level scheduling) for spark-rapids-ml with a similar design pattern with xgboost. the doc link can be found at https://docs.google.com/document/d/1C6s1nqNkOl0roNtRA5x1JPUuM2jykuR00JyMzgt0HvU/edit?usp=sharing which explained why we can't use GPU resources to do the task limitation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is bug:
Repro code:
import org.apache.spark.resource.{ResourceProfileBuilder, TaskResourceRequests}
val rdd = sc.range(0, 100, 1, 4)
var rdd1 = rdd.repartition(3)
val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
val rp = new ResourceProfileBuilder().require(treqs).build
rdd1 = rdd1.mapPartitions(iter => {Thread.sleep(30000); iter}).withResources(rp)
rdd1.collect() # 3 spark tasks run in parallel on one executor
spark-shell --master spark://192.168.31.236:7077 --conf spark.executor.cores=12 --conf spark.executor.resource.gpu.amount=1
Let's see whether spark team can address the bug.
If it can't be fixed for now, let's use current workaround.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @WeichenXu123, Yeah, looks like it is a spark bug. But Using the task.cpus to limit the task number in this PR can still achieve the same goal as using the task.gpu.amount, except wasting some cpus cores. Can we merge this PR first, and change it after spark fixes this bug?
I did the performance test Env:Spark standalone cluster with 2 Nodes each having 12 CPU cores and 64G mem, and 1 TITAN V GPU with 12 G mem. Dataset:Mortgage, total 255, 962, 232 rows, 27 features + 1 label Result,
.config("spark.executor.memory", "60g")
.config("spark.executor.cores", 12)
.config("spark.task.cpus", 12)
.config("spark.executor.resource.gpu.amount", 1)
.config("spark.task.resource.gpu.amount", 1)
.config("spark.sql.execution.arrow.maxRecordsPerBatch", 2000000) The training times are: 532.36, 534.46, average: 533.41
.config("spark.executor.memory", "60g")
.config("spark.executor.cores", 12)
.config("spark.task.cpus", 1)
.config("spark.executor.resource.gpu.amount", 1)
.config("spark.task.resource.gpu.amount", 0.08)
.config("spark.sql.execution.arrow.maxRecordsPerBatch", 2000000) The training times are: 383.34, 383.11, average: 383.225 The ratio is 533.41/383.225 = 1.39, which means the PR can have 39% speedup. The ETL phase is just reading the parquet file and doing the repartition, if we involve more real ETL operations, I guess, the stage-level scheduling PR can bring more speedup. |
Any update on the status of this PR? |
Hi @WeichenXu123, Could you help to review it again? Thx very much. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM,
once #9519 (comment) is fixed, please file a followup PR to remove workaround code.
Hi @trivialfis, Could you help trigger CI ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please provide an example usage that gets everything right (with no warning), and add comments on the example for each configuration with an explanation for why the configuration is set in such a way? If I were a user I would struggle to understand and address all potential warnings.
python-package/xgboost/spark/core.py
Outdated
# Each training task requires cpu cores > total executor cores//2 + 1 which can | ||
# make the tasks be sent to different executors. | ||
# | ||
# Please note that we can't set task_cpu_cores to the value which is smaller than | ||
# total executor cores/2 because only task_gpu_amount can't make sure the tasks be | ||
# sent to different executor even task_gpus=1.0 | ||
|
||
# Spark-rapids is a project to leverage GPUs to accelerate spark SQL and it can | ||
# really help the performance. If spark-rapids is enabled. we don't allow other | ||
# ETL gpu tasks running alongside training tasks to avoid OOM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please update the comments with a cleaner explanation?
I will file a follow-up PR to add related doc |
|
||
if int(executor_cores) == 1: | ||
# there will be only 1 task running at any time. | ||
self.logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this info
instead of warning
? Can we pick one consistently? (I like info a bit more, but feel free to pick one as long as it's consistent)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wbo4958 Let's draw the line here:
- warning for things that users should fix/address, similar to a bug but slightly less critical.
- info for things that "good to know", but totally fine if I don't know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to info
python-package/xgboost/spark/core.py
Outdated
|
||
if float(task_gpu_amount) == float(executor_gpus): | ||
self.logger.warning( | ||
"The configuration of cores (exec=%s, task=%s, runnable tasks=1) will " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this CPU cores or GPU cores (I don't know what a GPU core is), but the condition is testing the number of GPUs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it's testing how many tasks will run at the same time on the executor.
task_gpu_amount is a fraction value. we can use executor_gpus / task_gpu_amount to get the concurrent tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you are raising a warning, could you please revise the message so that users understand what you mean by "cores"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed this duplicated message since spark itself will throw a warning on the wasted resources.
|
||
if not _is_standalone_or_localcluster(sc): | ||
self.logger.warning( | ||
"Stage-level scheduling in xgboost requires spark standalone or " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can it be used on other types of clusters (maybe in the future)? Is there any cloud service like EMR or Dataproc can be used with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. from spark 3.5.1, we can enable this feature on spark standalone/ k8s / yarn cluster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a condition to check for 3.5.1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.5.1 is not released yet. I will file another PR once it's available.
* [backport] Support pandas 2.1.0. (dmlc#9557) (dmlc#9655) * [backport] Add support for cgroupv2. (dmlc#9651) (dmlc#9656) * Bump version to 2.0.1. (dmlc#9660) * [backport] [CI] Pull CentOS 7 images from NGC (dmlc#9666) (dmlc#9668) * Fix build for GCC 8.x (dmlc#9670) * [backport][pyspark] Support stage-level scheduling (dmlc#9519) (dmlc#9686) Co-authored-by: Bobby Wang <wbo4958@gmail.com> * Fix build for AppleClang 11 (dmlc#9684) * Fix libpath logic for Windows (dmlc#9687) * [CI] Build libxgboost4j.dylib for Intel Mac (dmlc#9704) * [jvm-packages] Remove hard dependency on libjvm (dmlc#9698) (dmlc#9705) * Use sys.base_prefix instead of sys.prefix (dmlc#9711) * Use sys.base_prefix instead of sys.prefix * Update libpath.py too --------- Co-authored-by: Jiaming Yuan <jm.yuan@outlook.com> Co-authored-by: Philip Hyunsu Cho <chohyu01@cs.washington.edu> Co-authored-by: Bobby Wang <wbo4958@gmail.com>
* [backport] Support pandas 2.1.0. (dmlc#9557) (dmlc#9655) * [backport] Add support for cgroupv2. (dmlc#9651) (dmlc#9656) * Bump version to 2.0.1. (dmlc#9660) * [backport] [CI] Pull CentOS 7 images from NGC (dmlc#9666) (dmlc#9668) * Fix build for GCC 8.x (dmlc#9670) * [backport][pyspark] Support stage-level scheduling (dmlc#9519) (dmlc#9686) Co-authored-by: Bobby Wang <wbo4958@gmail.com> * Fix build for AppleClang 11 (dmlc#9684) * Fix libpath logic for Windows (dmlc#9687) * [CI] Build libxgboost4j.dylib for Intel Mac (dmlc#9704) * [jvm-packages] Remove hard dependency on libjvm (dmlc#9698) (dmlc#9705) * Use sys.base_prefix instead of sys.prefix (dmlc#9711) * Use sys.base_prefix instead of sys.prefix * Update libpath.py too --------- Co-authored-by: Jiaming Yuan <jm.yuan@outlook.com> Co-authored-by: Philip Hyunsu Cho <chohyu01@cs.washington.edu> Co-authored-by: Bobby Wang <wbo4958@gmail.com>
* [backport] Support pandas 2.1.0. (dmlc#9557) (dmlc#9655) * [backport] Add support for cgroupv2. (dmlc#9651) (dmlc#9656) * Bump version to 2.0.1. (dmlc#9660) * [backport] [CI] Pull CentOS 7 images from NGC (dmlc#9666) (dmlc#9668) * Fix build for GCC 8.x (dmlc#9670) * [backport][pyspark] Support stage-level scheduling (dmlc#9519) (dmlc#9686) Co-authored-by: Bobby Wang <wbo4958@gmail.com> * Fix build for AppleClang 11 (dmlc#9684) * Fix libpath logic for Windows (dmlc#9687) * [CI] Build libxgboost4j.dylib for Intel Mac (dmlc#9704) * [jvm-packages] Remove hard dependency on libjvm (dmlc#9698) (dmlc#9705) * Use sys.base_prefix instead of sys.prefix (dmlc#9711) * Use sys.base_prefix instead of sys.prefix * Update libpath.py too --------- Co-authored-by: Jiaming Yuan <jm.yuan@outlook.com> Co-authored-by: Philip Hyunsu Cho <chohyu01@cs.washington.edu> Co-authored-by: Bobby Wang <wbo4958@gmail.com>
* [backport] Support pandas 2.1.0. (dmlc#9557) (dmlc#9655) * [backport] Add support for cgroupv2. (dmlc#9651) (dmlc#9656) * Bump version to 2.0.1. (dmlc#9660) * [backport] [CI] Pull CentOS 7 images from NGC (dmlc#9666) (dmlc#9668) * Fix build for GCC 8.x (dmlc#9670) * [backport][pyspark] Support stage-level scheduling (dmlc#9519) (dmlc#9686) Co-authored-by: Bobby Wang <wbo4958@gmail.com> * Fix build for AppleClang 11 (dmlc#9684) * Fix libpath logic for Windows (dmlc#9687) * [CI] Build libxgboost4j.dylib for Intel Mac (dmlc#9704) * [jvm-packages] Remove hard dependency on libjvm (dmlc#9698) (dmlc#9705) * Use sys.base_prefix instead of sys.prefix (dmlc#9711) * Use sys.base_prefix instead of sys.prefix * Update libpath.py too * [backport] Fix using categorical data with the ranker. (dmlc#9753) (dmlc#9778) * [jvm-packages] Add Scala version suffix to xgboost-jvm package (dmlc#9776) * Update JVM script (dmlc#9714) * Bump version to 2.0.2; revamp pom.xml * Update instructions in prepare_jvm_release.py * Fix formatting * fix merging for R-package/configure --------- Co-authored-by: Jiaming Yuan <jm.yuan@outlook.com> Co-authored-by: Philip Hyunsu Cho <chohyu01@cs.washington.edu> Co-authored-by: Bobby Wang <wbo4958@gmail.com> Co-authored-by: Dmitry Razdoburdin <>
XGBoost training task requires taking the whole GPU exclusively, so we have limited there's only 1 task running at the same time even for the ETL stages, which typically decreases the parallelism of spark tasks, and finally has some side-effect for the whole performance of xgboost end-2-end training.
The stage-level scheduling we are using in this PR does not change executor resources and hence does not require dynamic allocation to be enabled. The original executors are retained. The features used there, however, are new in Spark 3.4. We also changed CPU task resources therein to ensure only 1 task per gpu just for that barrier stage.