Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Redesign xgboost-spark checkpoint mechanism #4785

Closed
trams opened this issue Aug 17, 2019 · 4 comments
Closed

[RFC] Redesign xgboost-spark checkpoint mechanism #4785

trams opened this issue Aug 17, 2019 · 4 comments

Comments

@trams
Copy link
Contributor

trams commented Aug 17, 2019

XGBoost supports a distributed training in a reliable way in xgboost-spark package.
The reliability is archived though checkpointing mechanism.

How does it work?

Let's say one wants to train a model with 1500 trees and want to save a checkpoint after every 100 trees has been built.
Then xgboost-spark will start a Spark Application. Its driver will break down the training in 1500 / 100 = 15 steps and for each step it will load (if present) a checkpoint and train next 100 trees. After then the Booster is being pulled to a Spark driver and saved to a checkpoint folder under a new name. After checkpoint has been saved the driver schedules the next iteration of training.

If during the training one of the nodes dies (executor or driver) the Spark Application is being terminated.

In this case a user can relaunch the training with the same parameters and supply the same checkpointing folder. In case the folder is not empty and contains a valid model the training will restart from the last checkpoint.

From API perspective we have a trainForNonRanking function which takes RDD[LabeledPoints] and previous Booster and returns a new Booster

image

Current implementation has some problems which result in performance degradation if checkpointing is enabled (even if no failures occurred). See #3946 for more informtion

Quick and dirty fix

When the model is being reloaded one needs to rebuild caches.
Even after applying this patch one would still experience performance penalty for enabling checkpointing but it will be 100 times smaller if a step size is 100 trees.

My suggestion

I suggest to refactor and change APIs in a way so caches and current booster (which are essentially comprise the state of current training) will be exposed to Spark

So I suggest to create a function (see the image below)

def trainDistributedNNextSteps(inputMatrix: RDD[(Dmatrix, State)]): RDD[(Dmatrix, State)]  = {
    inputMatrix.mapPartition { it =>
       // TODO: Ensure each spark partition has only 1 element
      val head = it.next
      val input = head._1
      val state = head._2

     val newState = trainNNextSteps(matrix, state)

     // note that the input is passed *unchanged*
     Iterator(input -> newState);
    }
}

def trainNNextSteps(matrix: DMatrix, state: State): State = {
    // ….
}

def zeroState: State = .. // return zero state

image

Then one can actually use Spark facilities to save the state (including the cache if it deems to be useful)

val rdd: RDD[Dmatrix, State]

rdd.map(_._2).save(…)

We can still manually retrieve the booster and save it an old way

rdd.map(_._2.booster).collect()

Note that the signature of trainDistributedNNextSteps permits to chain the function. So we can make our checkpointing async which should reduce latency in case of making a lot of checkpoints.

Another benefit is that if we encounter no failures we do not actually need to rebuild the model training caches

One problem with saving the cache is that current cache key is a pointer of the matrix. But we do not need it to be the pointer. What we actually need is to reference this matrix by some unique id. So what we can do is to assign some unique deterministic id to each matrix. Then we can save, load and then reuse the cache later.

Alternative designs

Move checkpointing to one of tasks

Instead of saving the checkpoint on a driver we can save it in the first task. This way we can actually launch a training of all 1500 trees as one Spark Job and just launch async checkpoint dumps every 100 steps. This way we won't be needed to reconstructs the cache after each checkpoint

Personally I dislike this solution cause it makes checkpointing very hidden and probably can result in unexpected side effects.

This solution also does not permit to save|load caches. So we would pay the price of reconstructing them if the failure occurs (which at least in our case is not such a big deal)

Force RABIT to save their checkpoints to HDFS and use load from them

XGBoost uses RABIT as reliable AllReduce implementation. If the checkpoint will be saved in some durable storage I do not see why we can't load it from there and just use it.

But I do not have enough knowledge about RABIT to assess feasibility of this solution

Pros of this solution:

  • This will work for all distributed versions of xgboost not just spark one
    Cons:
  • Spark is completely unaware of any checkpoints being saved.

Please tell me what you think. This is Request For a Comment and I really want to think your opinion.

@CodingCat
Copy link
Member

CodingCat commented Aug 17, 2019

I just finished up the writing of RFC #4786

I am kind of confused about Personally I dislike this solution cause it makes checkpointing very hidden and probably can result in unexpected side effects., what hidden and unexpected side effects refer to? looks like very subjective concept

in #4786, I proposed to construct external checkpoint mechanism in XGBoost4J layer as well...in the engineering perspective, XGBoost-Spark passes in three variables to XGBoost4J, buildExternalCache, interval and a OutputStream. then in XGBoost4J layer, we simply feed booster to OutputStream in the partition where buildExternalCache is true when it has moved interval iterations

@trams
Copy link
Contributor Author

trams commented Aug 17, 2019 via email

@CodingCat
Copy link
Member

it is at #4786

@trams trams changed the title [RFC] Redesign xgboost-spark paritioning [RFC] Redesign xgboost-spark checkpoint mechanism Aug 20, 2019
@trams
Copy link
Contributor Author

trams commented Aug 20, 2019

I'll close this issue for now cause it is superseded by #4786 . Please comment at #4786

@trams trams closed this as completed Aug 20, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Nov 18, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Projects
None yet
Development

No branches or pull requests

3 participants