Skip to content

Commit

Permalink
revert jvm changes for future pr
Browse files Browse the repository at this point in the history
  • Loading branch information
Chen Qin committed Aug 14, 2019
1 parent 4f913fd commit efc114f
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -445,12 +445,6 @@ object XGBoost extends Serializable {
val parallelismTracker = new SparkParallelismTracker(sc, timeoutRequestWorkers,
nWorkers)
val rabitEnv = tracker.getWorkerEnvs

// CHEN QIN, overwrite rabit env, read rabit_cache and debug setting
for ((k, v) <- params) {
if (k.startsWith("rabit_")) rabitEnv.put(k, v.asInstanceOf[String])
}

val boostersAndMetrics = if (hasGroup) {
trainForRanking(transformedTrainingData.left.get, overriddenParams, rabitEnv,
checkpointRound, prevBooster, evalSetsMap)
Expand Down Expand Up @@ -480,12 +474,11 @@ object XGBoost extends Serializable {
}
}.last
} catch {
case t: Throwable =>
// if the job was aborted due to an exception
logger.error("the job was aborted due to ", t)
val v = params.getOrElse("rabit_cache", 0)
if (v == 0) trainingData.sparkContext.stop()
throw t
case t: Throwable =>
// if the job was aborted due to an exception
logger.error("the job was aborted due to ", t)
trainingData.sparkContext.stop()
throw t
} finally {
uncacheTrainingData(params.getOrElse("cacheTrainingSet", false).asInstanceOf[Boolean],
transformedTrainingData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,7 @@ class XGBoostClassifier (
weight, baseMargin, None, dataFrame).head)
}
transformSchema(dataset.schema, logging = true)
var derivedXGBParamMap = MLlib2XGBoostParams
for ( (k, v) <- xgboostParams) {
if (k.startsWith("rabit_")) {
derivedXGBParamMap = derivedXGBParamMap + (k -> v.asInstanceOf[String])
}
}
val derivedXGBParamMap = MLlib2XGBoostParams
// All non-null param maps in XGBoostClassifier are in derivedXGBParamMap.
val (_booster, _metrics) = XGBoost.trainDistributed(trainingSet, derivedXGBParamMap,
hasGroup = false, evalRDDMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,12 +185,7 @@ class XGBoostRegressor (
weight, baseMargin, Some(group), dataFrame).head)
}
transformSchema(dataset.schema, logging = true)
var derivedXGBParamMap = MLlib2XGBoostParams
for ( (k, v) <- xgboostParams) {
if (k.startsWith("rabit_")) {
derivedXGBParamMap = derivedXGBParamMap + (k -> v.asInstanceOf[String])
}
}
val derivedXGBParamMap = MLlib2XGBoostParams
// All non-null param maps in XGBoostRegressor are in derivedXGBParamMap.
val (_booster, _metrics) = XGBoost.trainDistributed(trainingSet, derivedXGBParamMap,
hasGroup = group != lit(-1), evalRDDMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,5 @@ class MissingValueHandlingSuite extends FunSuite with PerTest {
intercept[XGBoostError] {
new XGBoostClassifier(paramMap).fit(inputDF)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
}
assert(ss.sparkContext.isStopped === true)
}

test("fail training elegantly with unsupported objective function") {
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "wrong_objective_function", "num_class" -> "6", "num_round" -> 5,
Expand All @@ -62,6 +63,7 @@ class ParameterSuite extends FunSuite with PerTest with BeforeAndAfterAll {
waitForSparkContextShutdown()
}
}

test("fail training elegantly with unsupported eval metrics") {
val paramMap = Map("eta" -> "0.1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "multi:softmax", "num_class" -> "6", "num_round" -> 5,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,34 +233,6 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
assert(error(nextModel._booster) < 0.1)
}

test("training with failure recovery with rabit cache and debug enabled") {
val eval = new EvalError()
val training = buildDataFrame(Classification.train)
val testDM = new DMatrix(Classification.test.iterator)

val tmpPath = Files.createTempDirectory("model1").toAbsolutePath.toString
val paramMap = Map("eta" -> "1", "max_depth" -> 2,
"objective" -> "binary:logistic", "checkpoint_path" -> tmpPath,
"checkpoint_interval" -> 2, "num_workers" -> numWorkers,
"rabit_cache" -> "1", "rabit_debug" -> "1", "cacheTrainingSet" -> true)

val prevModel = new XGBoostClassifier(paramMap ++ Seq("num_round" -> 5)).fit(training)
def error(model: Booster): Float = eval.eval(
model.predict(testDM, outPutMargin = true), testDM)

// Check only one model is kept after training
val files = FileSystem.get(sc.hadoopConfiguration).listStatus(new Path(tmpPath))
assert(files.length == 1)
assert(files.head.getPath.getName == "8.model")
val tmpModel = SXGBoost.loadModel(s"$tmpPath/8.model")

// Train next model based on prev model
val nextModel = new XGBoostClassifier(paramMap ++ Seq("num_round" -> 8)).fit(training)
assert(error(tmpModel) > error(prevModel._booster))
assert(error(prevModel._booster) > error(nextModel._booster))
assert(error(nextModel._booster) < 0.1)
}

test("repartitionForTrainingGroup with group data") {
// test different splits to cover the corner cases.
for (split <- 1 to 20) {
Expand Down Expand Up @@ -400,6 +372,7 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
"num_workers" -> numWorkers))
.fit(buildDataFrame(Regression.train))
val regDF = buildDataFrame(Regression.test)

val regRet1 = regModel.transform(regDF).collect()
val regRet2 = regModel.setInferBatchSize(1).transform(regDF).collect()
val regRet3 = regModel.setInferBatchSize(10).transform(regDF).collect()
Expand All @@ -415,8 +388,9 @@ class XGBoostGeneralSuite extends FunSuite with TmpFolderPerSuite with PerTest {
"objective" -> "binary:logistic",
"num_round" -> 5,
"num_workers" -> numWorkers))
.fit(buildDataFrame(Classification.train))
.fit(buildDataFrame(Classification.train))
val clsDF = buildDataFrame(Classification.test)

val clsRet1 = clsModel.transform(clsDF).collect()
val clsRet2 = clsModel.setInferBatchSize(1).transform(clsDF).collect()
val clsRet3 = clsModel.setInferBatchSize(10).transform(clsDF).collect()
Expand Down
12 changes: 10 additions & 2 deletions src/cli_main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ void CLITrain(const CLIParam& param) {
if (version % 2 == 0) {
LOG(INFO) << "boosting round " << i << ", " << elapsed << " sec elapsed";
learner->UpdateOneIter(i, dtrain.get());
rabit::CheckPoint(learner.get());
if (learner->AllowLazyCheckPoint()) {
rabit::LazyCheckPoint(learner.get());
} else {
rabit::CheckPoint(learner.get());
}
version += 1;
}
CHECK_EQ(version, rabit::VersionNumber());
Expand All @@ -226,7 +230,11 @@ void CLITrain(const CLIParam& param) {
learner->Save(fo.get());
}

rabit::CheckPoint(learner.get());
if (learner->AllowLazyCheckPoint()) {
rabit::LazyCheckPoint(learner.get());
} else {
rabit::CheckPoint(learner.get());
}
version += 1;
CHECK_EQ(version, rabit::VersionNumber());
}
Expand Down
1 change: 1 addition & 0 deletions src/tree/updater_histmaker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class HistMaker: public BaseMaker {
for (int i = 0; i < p_tree->param.num_roots; ++i) {
(*p_tree)[i].SetLeaf(0.0f, 0);
}

for (int depth = 0; depth < param_.max_depth; ++depth) {
// reset and propose candidate split
this->ResetPosAndPropose(gpair, p_fmat, fwork_set_, *p_tree);
Expand Down

0 comments on commit efc114f

Please sign in to comment.