From 88699833f4f60287c6734f17f09a161248a5a9df Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Fri, 8 Feb 2019 22:10:58 -0800 Subject: [PATCH 1/3] fixes --- .../dmlc/xgboost4j/scala/spark/XGBoost.scala | 58 ++++++++++++++++--- .../scala/spark/XGBoostGeneralSuite.scala | 13 ++--- 2 files changed, 56 insertions(+), 15 deletions(-) diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala index 2fe554064e0c..709601816eb3 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoost.scala @@ -618,18 +618,34 @@ private object Watches { val dms = nameAndlabeledPointGroupSets.map { case (name, labeledPointsGroups) => val baseMargins = new mutable.ArrayBuilder.ofFloat - val duplicatedItr = labeledPointsGroups.map(labeledPoints => { - labeledPoints.map { labeledPoint => + val groupsInfo = new mutable.ArrayBuilder.ofInt + val weights = new mutable.ArrayBuilder.ofFloat + val iter = labeledPointsGroups.filter(labeledPointGroup => { + var groupWeight = -1.0f + var groupSize = 0 + labeledPointGroup.map { labeledPoint => { + if (groupWeight < 0) { + groupWeight = labeledPoint.weight + } else if (groupWeight != labeledPoint.weight) { + throw new IllegalArgumentException("the instances in the same group have to be" + + s" assigned with the same weight (unexpected weight ${labeledPoint.weight}") + } baseMargins += labeledPoint.baseMargin + groupSize += 1 labeledPoint } + } + weights += groupWeight + groupsInfo += groupSize + true }) - val dMatrix = new DMatrix(duplicatedItr.flatMap(_.iterator), - cachedDirName.map(_ + s"/$name").orNull) + val dMatrix = new DMatrix(iter.flatMap(_.iterator), cachedDirName.map(_ + s"/$name").orNull) val baseMargin = fromBaseMarginsToArray(baseMargins.result().iterator) if (baseMargin.isDefined) { dMatrix.setBaseMargin(baseMargin.get) } + dMatrix.setGroup(groupsInfo.result()) + dMatrix.setWeight(weights.result()) (name, dMatrix) }.toArray new Watches(dms.map(_._2), dms.map(_._1), cachedDirName) @@ -645,20 +661,46 @@ private object Watches { val testPoints = mutable.ArrayBuilder.make[XGBLabeledPoint] val trainBaseMargins = new mutable.ArrayBuilder.ofFloat val testBaseMargins = new mutable.ArrayBuilder.ofFloat + val trainGroups = new mutable.ArrayBuilder.ofInt val testGroups = new mutable.ArrayBuilder.ofInt + val trainWeights = new mutable.ArrayBuilder.ofFloat + val testWeights = new mutable.ArrayBuilder.ofFloat + val trainLabelPointGroups = labeledPointGroups.filter { labeledPointGroup => val accepted = r.nextDouble() <= trainTestRatio if (!accepted) { + var groupWeight = -1.0f + var groupSize = 0 labeledPointGroup.foreach(labeledPoint => { testPoints += labeledPoint testBaseMargins += labeledPoint.baseMargin + if (groupWeight < 0) { + groupWeight = labeledPoint.weight + } else if (labeledPoint.weight != groupWeight) { + throw new IllegalArgumentException("the instances in the same group have to be" + + s" assigned with the same weight (unexpected weight ${labeledPoint.weight}") + } + groupSize += 1 }) - testGroups += labeledPointGroup.length + testWeights += groupWeight + testGroups += groupSize } else { - labeledPointGroup.foreach(trainBaseMargins += _.baseMargin) - trainGroups += labeledPointGroup.length + var groupWeight = -1.0f + var groupSize = 0 + labeledPointGroup.foreach { labeledPoint => { + if (groupWeight < 0) { + groupWeight = labeledPoint.weight + } else if (labeledPoint.weight != groupWeight) { + throw new IllegalArgumentException("the instances in the same group have to be" + + s" assigned with the same weight (unexpected weight ${labeledPoint.weight}") + } + trainBaseMargins += labeledPoint.baseMargin + groupSize += 1 + }} + trainWeights += groupWeight + trainGroups += groupSize } accepted } @@ -666,10 +708,12 @@ private object Watches { val trainPoints = trainLabelPointGroups.flatMap(_.iterator) val trainMatrix = new DMatrix(trainPoints, cacheDirName.map(_ + "/train").orNull) trainMatrix.setGroup(trainGroups.result()) + trainMatrix.setWeight(trainWeights.result()) val testMatrix = new DMatrix(testPoints.result().iterator, cacheDirName.map(_ + "/test").orNull) if (trainTestRatio < 1.0) { testMatrix.setGroup(testGroups.result()) + testMatrix.setWeight(testWeights.result()) } val trainMargin = fromBaseMarginsToArray(trainBaseMargins.result().iterator) diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index 865c03c4a4f5..59b7d1b05674 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -293,12 +293,11 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { test("distributed training with group data") { val trainingRDD = sc.parallelize(Ranking.train, 5) - val (booster, metrics) = XGBoost.trainDistributed( + val (booster, _) = XGBoost.trainDistributed( trainingRDD, List("eta" -> "1", "max_depth" -> "6", "silent" -> "1", - "objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> numWorkers, - "custom_eval" -> null, "custom_obj" -> null, "use_external_memory" -> false, - "missing" -> Float.NaN).toMap, + "objective" -> "rank:pairwise", "num_round" -> 5, "num_workers" -> numWorkers, + "missing" -> Float.NaN, "use_external_memory" -> false).toMap, hasGroup = true) assert(booster != null) @@ -337,8 +336,7 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { "objective" -> "binary:logistic", "num_round" -> 5, "num_workers" -> numWorkers) - val xgb1 = new XGBoostClassifier(paramMap1) - xgb1.setEvalSets(Map("eval1" -> eval1, "eval2" -> eval2)) + val xgb1 = new XGBoostClassifier(paramMap1).setEvalSets(Map("eval1" -> eval1, "eval2" -> eval2)) val model1 = xgb1.fit(train) assert(model1.summary.validationObjectiveHistory.length === 2) assert(model1.summary.validationObjectiveHistory.map(_._1).toSet === Set("eval1", "eval2")) @@ -367,8 +365,7 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { val paramMap1 = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", "objective" -> "rank:pairwise", "num_round" -> 5, "num_workers" -> numWorkers, "group_col" -> "group") - val xgb1 = new XGBoostRegressor(paramMap1) - xgb1.setEvalSets(Map("eval1" -> eval1, "eval2" -> eval2)) + val xgb1 = new XGBoostRegressor(paramMap1).setEvalSets(Map("eval1" -> eval1, "eval2" -> eval2)) val model1 = xgb1.fit(train) assert(model1 != null) assert(model1.summary.validationObjectiveHistory.length === 2) From b181157d7d0baabc8bcb71fc3e1e3187cdcd5df0 Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Fri, 8 Feb 2019 22:50:15 -0800 Subject: [PATCH 2/3] fix weights length --- .../ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index 59b7d1b05674..cc59c856c364 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -290,7 +290,7 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { }) XGBoost.repartitionForTrainingGroup(trainingRDD, 4) } - + test("distributed training with group data") { val trainingRDD = sc.parallelize(Ranking.train, 5) val (booster, _) = XGBoost.trainDistributed( From 283c548bc267c970731a66303a1e365b3e27cb2e Mon Sep 17 00:00:00 2001 From: Nan Zhu Date: Fri, 8 Feb 2019 23:50:30 -0800 Subject: [PATCH 3/3] fix scalastyle error --- .../ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala index cc59c856c364..59b7d1b05674 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostGeneralSuite.scala @@ -290,7 +290,7 @@ class XGBoostGeneralSuite extends FunSuite with PerTest { }) XGBoost.repartitionForTrainingGroup(trainingRDD, 4) } - + test("distributed training with group data") { val trainingRDD = sc.parallelize(Ranking.train, 5) val (booster, _) = XGBoost.trainDistributed(