Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rabit_bootstrap_cache ] failed xgb worker recover from other workers #4808

Merged
merged 39 commits into from
Sep 17, 2019

Conversation

chenqin
Copy link
Contributor

@chenqin chenqin commented Aug 25, 2019

[copy from xgboost/pull/4769] The goal of this pr is to enable failed native xgb worker retry and restore on approx tree_method detail underlaying implementation can be found dmlc/rabit#98

Summary:

  • via enable rabit_bootstrap_cache=1 setting, user can retry failed xgb worker without restart entire job
  • add xgb_recovery test case to traivs to simulate multiple xgb worker failure on approx three_method with prediction accuracy on par
  • add needed cfg_ to native checkpoint payload if user set rabit_bootstrap_cache=1
  • backward compatible with old model or disabled/not set rabit_bootstrap_cache case.

Note: per convo with @CodingCat fast histgram in master is still not ready to support.

@hcho3
Copy link
Collaborator

hcho3 commented Aug 26, 2019

Running tests locally, for faster experimentation: https://xgboost.readthedocs.io/en/latest/contrib/unit_tests.html

@chenqin chenqin force-pushed the rabit_dist branch 3 times, most recently from 9134b63 to df6f715 Compare August 27, 2019 07:39
@chenqin
Copy link
Contributor Author

chenqin commented Aug 27, 2019

Some investigate around checkpoint restore caused build failure, there seems some issue with XBG CLI loadcheckpoint where we lost some training parameters initially set in learner creation. This is due to we overwrite learner with checkpoint payload where not all settings were saved.

In essence, we want to split parameters outside of checkpoint payload. Since we can get from restart worker and decide proper config for each environments (gpu/cpu/distributed etc) , we might just merge those config back to learner before start/resume training.

@trivialfis

@chenqin
Copy link
Contributor Author

chenqin commented Aug 29, 2019

I did tests of commenting extra config in learner, it pass failed jvm tests on model save/load . So looks like jvm saved model were impacted with extra configs. @CodingCat F.Y.I
std::setstd::string saved_configs_ = {};
//{"max_depth", "tree_method", "dsplit",
// "seed", "silent", "gamma", "min_child_weight"};

update, this is actually xgboost4j not xgboost4j-spark, I can help clean up in upcoming jvm focused pr.

@chenqin chenqin changed the title [WIP] rebase and pull rabit with latest feature [rabit_bootstrap_cache ] failed xgb worker recover from other workers Aug 30, 2019
@chenqin
Copy link
Contributor Author

chenqin commented Aug 30, 2019

@trivialfis since you were working on organizing trainer parameters, can you help review this change. We allow failed xgb worker retry with additional saved_params IF user opt-ed in rabit_bootstrap_cache

cc @hcho3 @CodingCat

update, accuracy is same w/o recovery
2019-08-30 14:37:43,758 INFO [14:37:43] [11] test-rmse:0.026854 2019-08-30 14:37:44,772 INFO @tracker All nodes finishes job 2019-08-30 14:37:44,772 INFO @tracker 1.1630983352661133 secs between node start and job finish
with recovery
2019-08-30 01:14:27,988 INFO [01:14:27] [11] test-rmse:0.026854 2019-08-30 01:14:29,012 INFO @tracker All nodes finishes job 2019-08-30 01:14:29,013 INFO @tracker 1.644543170928955 secs between node start and job finish

@trivialfis
Copy link
Member

Yes. I'm also playing with rabit recently. Will review soon. Thanks for mentioning me.

@chenqin
Copy link
Contributor Author

chenqin commented Sep 2, 2019

image
source https://en.wikipedia.org/wiki/File:Oryctolagus_cuniculus_Rcdo.jpg

@trivialfis
Copy link
Member

@chenqin Is this currently critical? I wrote the CMake file before, the reason I build rabit in XGBoost is because it fails to build on Windows. Would it be inconvenient for you if I take some time to open a PR for rabit cmake build file?

Copy link
Member

@trivialfis trivialfis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good overall. Please address the comments for simd and todo item.

src/common/hist_util.cc Outdated Show resolved Hide resolved
src/common/hist_util.cc Outdated Show resolved Hide resolved
src/common/quantile.h Outdated Show resolved Hide resolved
src/learner.cc Show resolved Hide resolved
tests/cli/runxgb.sh Outdated Show resolved Hide resolved
@chenqin
Copy link
Contributor Author

chenqin commented Sep 12, 2019

Addressed feedbacks

  • remove omp declarative
  • explained TODO, follow up would move histogram init before loadcheckpoint.
  • explained allreduce in DMatrix::Load, follow up with fix of remove duplicated colmn size check in train/eval datasets

@chenqin
Copy link
Contributor Author

chenqin commented Sep 12, 2019

Do we need secondary reviewer before merge?

@trivialfis
Copy link
Member

Considering I'm only start doing distributed computing in lesser than a month ...

Copy link
Collaborator

@hcho3 hcho3 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, however I'm not exactly an expert in distributed training either.

@CodingCat @thvasilo @trams

Copy link
Contributor

@trams trams left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but I am not expert in this part of xgboost and I am not sure I understand what does this pull request archives. More specifically I do not quite get where did you add a recovery from another worker in the code

src/common/hist_util.cc Outdated Show resolved Hide resolved
tests/cli/runxgb.sh Outdated Show resolved Hide resolved
tests/cli/runxgb.sh Outdated Show resolved Hide resolved
tests/travis/setup.sh Outdated Show resolved Hide resolved
.travis.yml Outdated Show resolved Hide resolved
@hcho3
Copy link
Collaborator

hcho3 commented Sep 12, 2019

@trams Full context can be found in dmlc/rabit#98. The goal is to "implement immutable cache in rabit to help failed worker recover not synced allreduces in bootstrap time." Most of the recovery logic is found in dmlc/rabit#98, and this PR modifies Rabit calls to make use of the new recovery logic.

@chenqin chenqin force-pushed the rabit_dist branch 3 times, most recently from b9f5b7d to 3f2306f Compare September 13, 2019 05:09
@thvasilo
Copy link
Contributor

thvasilo commented Sep 13, 2019

I'll add here my understanding of the original PR in rabit, hopefully it helps other maintainers with reviewing and understanding the changes. @CodingCat and @chenqin can correct any mistakes in my explanation.

After going through the design doc, my understanding of the purpose of dlmc/rabit#98 is allowing for single worker recovery vs. the current fail-all recovery (explained below). The doc also mentions handling cases where workers fail before the first checkpoint, during bootstrap actions like getting the number of features which is done through an allreduce at the beginning of training or broadcasting the column sampler seed value. It's not 100% clear to me why these are special cases, it's hard to grok the doc for that section (and I'm jet lagged).

If my understanding is correct, in the current implementation, in case a failure happens all workers are shut down and restarted ("fail-all" in the doc), and learning resumes from the last checkpoint. This involves requesting resources from the scheduler (e.g. Yarn) and shuffling all the data again from scratch.
If we have W workers and F failures happen (where a failure is defined as at least one worker failed during a distinct iteration) , we would need to request W*(F+1) new instances: W workers to start the job, then W workers again after each failure.
Since all workers, including healthy ones are shut down in case of a failure, we need to transfer the data again to the new workers. In case of W workers and F failures that would mean shuffling data W*F times, according to the doc.

Putting my PhD hat on, I'll note here that I don't fully agree with this definition of a shuffle in the doc, which treats each partition (i.e. worker) as a "shuffle". A "shuffle" (in Spark terms) is a single distribution of the data amongst all workers in the cluster. Each distinct failure currently requires one shuffle, so I'd rather say that F failures require F shuffles, rather than F*W. However we need the per worker granularity later, so I'd say that currently each failure requires us to send W data partitions over the network.

In any case, when we have massive datasets and hundreds of workers, both these operations, requesting resources and shuffling the data across the network can be very costly and block training for extended periods of time, especially in multi-tenant clusters.

The proposed solution then is not to kill all workers and start training from the last checkpoint, but rather do a single node recovery: when a node fails, only that one is restarted, the rest of the cluster waits for it to be bootstrapped, and then continues learning.
Compared to the existing approach, a failure would only mean requesting f new instances from the scheduler, where f is the number of failed workers in the current iteration. Accordingly we would only need to send f data partitions over the network (one for each failed worker), which can be highly beneficial when f << W.

The doc continues to explain how the recovery is handled, but I haven't gone through that part in detail. Hope this helps explain the purpose of the PR!

@chenqin
Copy link
Contributor Author

chenqin commented Sep 13, 2019

Appreciate your help on explain things in much detail way!

I agree the wording may needs more thoughts. yes, it strictly W full reshuffle assume every failure cause entire job retry (before loadcheckpoint) and we have W failures one at a time in job life cycle. In real life, those dataset may not saved in HDFS instead generated from various table join and feature extraction stages.

In this pr, along with techniques of external shuffle service as well as determined partitioned scheme. We are moving towards limited impact of single failure to less than full shuffle. So yes, the comparison is lower-bound of estimation where ideally also W reshuffle on 1/M dataset. Worst case should be same as current approach, when cluster lost track of critical mass of datasets so that it needs to redo everything from beginning.

Putting my PhD hat on, I'll note here that I don't fully agree with this definition of a shuffle in the doc, which treats each partition (i.e. worker) as a "shuffle". A "shuffle" (in Spark terms) is a single distribution of the data amongst all workers in the cluster. Each distinct failure currently requires one shuffle, so I'd rather say that F failures require F shuffles, rather than F*W. However we need the per worker granularity later, so I'd say that currently each failure requires us to send W data partitions over the network.

@chenqin
Copy link
Contributor Author

chenqin commented Sep 14, 2019

Can we rerun jvm tests, seems flaky.

Are we conform to merge this change?

@trivialfis
Copy link
Member

Restarted. Will merge once we have tests passed.

@trivialfis trivialfis merged commit 512f037 into dmlc:master Sep 17, 2019
@trivialfis
Copy link
Member

@chenqin Merged, big thanks!

@lock lock bot locked as resolved and limited conversation to collaborators Dec 16, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants