Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm-packages] do not use multiple jobs to make checkpoints #5082

Merged
merged 8 commits into from
Feb 2, 2020

Conversation

CodingCat
Copy link
Member

@CodingCat CodingCat commented Dec 1, 2019

No description provided.

@CodingCat
Copy link
Member Author

@trams mind taking a look?

new XGBoostClassifier(paramMap ++ Seq("num_round" -> 4)).fit(training))
private def produceParamMap(checkpointPath: String, checkpointInterval: Int):
Map[String, Any] = {
Map("eta" -> "1", "max_depth" -> "2", "silent" -> "1",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Silent is deprecated.

String checkpointPath = getPath(latestVersion);
InputStream in = fs.open(new Path(checkpointPath));
logger.info("loaded checkpoint from " + checkpointPath);
Booster booster = XGBoost.loadModel(in);
Copy link
Member

@trivialfis trivialfis Dec 1, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a RFC for distinguishing model and checkpoint: #4855 . Which is implemented in #4732 with simple documentation. The basic idea is when you perform SaveModel only the trees and objective are saved, while SaveCheckPoints also saves all hyper-parameters like eta, max_depth, tree_method etc that's guaranteed to continue the previous training. By continuing previous training, I mean:

 for i in range(4):
    bst.update_one_iter()

  bst.save()
  bst.load()

  for i in range(4):
    bst.update_one_iter()

Should equal to following when prediction cache is not involved:

  for i in range(8):
    bst.updater_one_iter()

This can not be done by SaveModel as it discards all hyper-parameters, loading it as check point will revert them into default values. It seems XGBoosterLoadRabitCheckpoint is more appropriate here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm....in JVM part, we actually not that care about it, as the training parameters are corrected by what users passed in training API

e.g.

  1. we load a booster with default parameter at https://github.com/dmlc/xgboost/pull/5082/files#diff-095250777d030169ae83818b37ab44b1R530

  2. we continue training with https://github.com/dmlc/xgboost/pull/5082/files#diff-095250777d030169ae83818b37ab44b1R543-R544

  3. in buildDistributedBoosters https://github.com/dmlc/xgboost/pull/5082/files#diff-095250777d030169ae83818b37ab44b1R442-R443 we will call Java API like https://github.com/dmlc/xgboost/pull/5082/files#diff-095250777d030169ae83818b37ab44b1R354-R357

  4. before we start training, we'll set params https://github.com/dmlc/xgboost/pull/5082/files#diff-314932ca1cabd6bd148e8f8bd85b8947R190

Copy link
Member

@trivialfis trivialfis Dec 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat Can we consider replacing all these with the JSON implementation after I manage to merge that PR? Maybe next release after I testing it enough on dask side? You can load/save only hyper-parameters in that PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@CodingCat I provided a sample output of XGBoosterSaveJsonParameters in #4732 's document. Pasted below. I believe it's more robust than language binding implementation as it actually walks through all parameters. Also with it we can build some consistency between language bindings.

{
      "Learner": {
        "generic_parameter": {
          "enable_experimental_json_serialization": "0",
          "gpu_id": "0",
          "gpu_page_size": "0",
          "n_jobs": "0",
          "random_state": "0",
          "seed": "0",
          "seed_per_iteration": "0"
        },
        "gradient_booster": {
          "gbtree_train_param": {
            "num_parallel_tree": "1",
            "predictor": "gpu_predictor",
            "process_type": "default",
            "tree_method": "gpu_hist",
            "updater": "grow_gpu_hist",
            "updater_seq": "grow_gpu_hist"
          },
          "name": "gbtree",
          "updater": {
            "grow_gpu_hist": {
              "gpu_hist_train_param": {
                "debug_synchronize": "0",
                "gpu_batch_nrows": "0",
                "single_precision_histogram": "0"
              },
              "train_param": {
                "alpha": "0",
                "cache_opt": "1",
                "colsample_bylevel": "1",
                "colsample_bynode": "1",
                "colsample_bytree": "1",
                "default_direction": "learn",
                "enable_feature_grouping": "0",
                "eta": "0.300000012",
                "gamma": "0",
                "grow_policy": "depthwise",
                "interaction_constraints": "",
                "lambda": "1",
                "learning_rate": "0.300000012",
                "max_bin": "256",
                "max_conflict_rate": "0",
                "max_delta_step": "0",
                "max_depth": "6",
                "max_leaves": "0",
                "max_search_group": "100",
                "refresh_leaf": "1",
                "sketch_eps": "0.0299999993",
                "sketch_ratio": "2",
                "subsample": "1"
              }
            }
          }
        },
        "learner_train_param": {
          "booster": "gbtree",
          "disable_default_eval_metric": "0",
          "dsplit": "auto",
          "objective": "reg:squarederror"
        },
        "metrics": [],
        "objective": {
          "name": "reg:squarederror",
          "reg_loss_param": {
            "scale_pos_weight": "1"
          }
        }
      },
      "version": [1, 0, 0]
    }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

String eventualPath = getPath(boosterToCheckpoint.getVersion());
String tempPath = eventualPath + "-" + UUID.randomUUID();
OutputStream out = fs.create(new Path(tempPath), true);
boosterToCheckpoint.saveModel(out);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor

@trams trams left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job. I would fix a problem with dependency on hadoop-hdfs and I think it is fine

This pull request will (probably) solve the issue of slow performance of checkpointing mechanism for us

@@ -342,14 +341,25 @@ object XGBoost extends Serializable {
rabitEnv.put("DMLC_TASK_ID", taskId)
rabitEnv.put("DMLC_NUM_ATTEMPT", attempt)
rabitEnv.put("DMLC_WORKER_STOP_PROCESS_ON_ERROR", "false")

val numRounds = xgbExecutionParam.numRounds
val makeCheckpoint = xgbExecutionParam.checkpointParam.checkpointPath != null &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional suggestion.
Can we make checkpointParam to be Option[String] to make it a bit cleaner?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

val numRounds = xgbExecutionParam.numRounds
val makeCheckpoint = xgbExecutionParam.checkpointParam.checkpointPath != null &&
xgbExecutionParam.checkpointParam.checkpointPath.nonEmpty &&
taskId.toInt == 0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see you want for the first task to save all the checkpoints which is good and simple policy.
The downside I see that it gives one task potentially more work (especially if misconfigured).
Should we think later about the more complex model? Something like checkpoint_id % NUM_OF_TASKS == task_id (round robin)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do it later if we find the perf issue

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree

SXGBoost.trainAndSaveCheckpoint(
watches.toMap("train"), xgbExecutionParam.toMap, numRounds,
watches.toMap, metrics, obj, eval,
earlyStoppingRound = numEarlyStoppingRounds, prevBooster, Some(externalCheckpointParams))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nitpicking. I suggest to use
Option(externalCheckpointParams)
In your case this would create Some(externalCheckpointParams) but it will check whether this variable is null or not.

If you use Some(x) directly you may end up with Some(null) after few code changes which would lead to all kind of weird behavior. Some(x) expects in its implementation that x != null

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.3</version>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be problematic in our (Criteo) environment and in general too.
The problem is that currently xgboost-spark depends on Spark 2.4.3 (

<spark.version>2.4.3</spark.version>
)
and it depends on Hadoop 2.6.5
So I suggest two things

  1. change it to 2.6.5 unless you really need 2.7.3
  2. Move this to a parent pom as a property and use it here

Also note you are including this as a compile dependency which may be not ideal at least in some environments but here I am no expert. I am familiar with our Hadoop|Yarn cluster only and our way of managing Spark jobs which is a bit weird :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to provided and keep 2.7.3 , the reason is that 2.7.3 is kind of standard support in spark, if you look at spark, it only provides pre-build for 2.7

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that you target Spark 3.0 in this release which supports only Hadoop 2.7
Making dependency provided should solve Criteo's problem

.map(this::getPath).collect(Collectors.toList());
String eventualPath = getPath(boosterToCheckpoint.getVersion());
String tempPath = eventualPath + "-" + UUID.randomUUID();
OutputStream out = fs.create(new Path(tempPath), true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can try to use try-with-resources here to make it a bit clearer and cleaner
https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html

It is the same concept as RAII from C++ but in Java

try {
fs.delete(new Path(getPath(v)), true);
} catch (IOException e) {
e.printStackTrace();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to use logger.error here. You can copy the message from line 79

ecm.updateCheckpoint(booster);
}
} catch (Exception e) {
logger.error("failed to save checkpoint in XGBoost4J", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to add to the message iter. It will greatly the quality of an expection

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

int earlyStoppingRounds,
Booster booster,
int checkpointInterval,
String checkpointPath,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused. Is there a reason why we don't use Hadoop Path object here. I would also replace Filesystem on line 143 to Configuration object instead and create Filesystem on demand like it is done in scala wrapper

P.S. Is there a reason why do we have java and scala wrappers at the same time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Scala wrapper also accepts string?

the major reason to have two wrappers is to return Scala Booster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It makes sense

-1, null, null);
} catch (IOException e) {
logger.error("training failed in xgboost4j", e);
throw new XGBoostError("training failed in xgboost4j " + e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest to include e as a cause when creating an exception.

Instead of new RuntimeException("message " + e) one can do new RuntimeException("message ", e)
this way the stack trace of the cause will be passed down and it will resurface to the developer faster.

To archive the same using XGBoostError I suggest to add a parameter cause and pass it to the parent constructor (see https://docs.oracle.com/javase/7/docs/api/java/lang/Exception.html#Exception(java.lang.String,%20java.lang.Throwable) ) for more details

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@CodingCat CodingCat force-pushed the single_job_checkpoint branch from 6485c89 to 284b7ab Compare December 18, 2019 20:41
@CodingCat CodingCat changed the title [WIP][jvm-packages] do not use multiple jobs to make checkpoints [jvm-packages] do not use multiple jobs to make checkpoints Feb 2, 2020
@CodingCat
Copy link
Member Author

ok, finally got a change to do integration test, everything looks fine

single job

image

cp created

image

@CodingCat CodingCat merged commit d7b45fb into dmlc:master Feb 2, 2020
CodingCat added a commit that referenced this pull request Feb 3, 2020
* temp

* temp

* tep

* address the comments

* fix stylistic issues

* fix

* external checkpoint
@hcho3 hcho3 mentioned this pull request Feb 3, 2020
12 tasks
@lock lock bot locked as resolved and limited conversation to collaborators May 5, 2020
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants