From 68948c23db28bb22abb1073b88e925ed9704b4e8 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 19 Jun 2024 11:15:24 +0800 Subject: [PATCH] udpate --- .../scala/spark/GPUXGBoostPlugin.scala | 37 +--- .../scala/spark/XGBoostEstimator.scala | 187 ++++++++++-------- .../scala/spark/params/XGBoostParams.scala | 40 ++-- 3 files changed, 137 insertions(+), 127 deletions(-) diff --git a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GPUXGBoostPlugin.scala b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GPUXGBoostPlugin.scala index 05bb326e42b2..711ba67da9e1 100644 --- a/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GPUXGBoostPlugin.scala +++ b/jvm-packages/xgboost4j-spark-gpu/src/main/scala/ml/dmlc/xgboost4j/scala/spark/GPUXGBoostPlugin.scala @@ -27,13 +27,6 @@ import org.apache.spark.sql.{Column, Dataset} import ml.dmlc.xgboost4j.java.{CudfColumnBatch, GpuColumnBatch} import ml.dmlc.xgboost4j.scala.QuantileDMatrix -private[spark] case class ColumnIndices( - labelId: Int, - featuresId: Seq[Int], - weightId: Option[Int], - marginId: Option[Int], - groupId: Option[Int]) - class GPUXGBoostPlugin extends XGBoostPlugin { /** @@ -51,23 +44,24 @@ class GPUXGBoostPlugin extends XGBoostPlugin { hasRapidsPlugin && rapidsEnabled } + // TODO, support numeric type private def preprocess[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]]( estimator: XGBoostEstimator[T, M], dataset: Dataset[_]): Dataset[_] = { val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty + val features = estimator.getFeaturesCols (features.toSeq ++ Seq(estimator.getLabelCol)).foreach { name => val col = estimator.castToFloatIfNeeded(dataset.schema, name) selectedCols.append(col) } + val input = dataset.select(selectedCols: _*) - // TODO add repartition - input.repartition(estimator.getNumWorkers) + estimator.repartitionIfNeeded(input) } - /** * Convert Dataset to RDD[Watches] which will be fed into XGBoost * @@ -78,32 +72,21 @@ class GPUXGBoostPlugin extends XGBoostPlugin { override def buildRddWatches[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]]( estimator: XGBoostEstimator[T, M], dataset: Dataset[_]): RDD[Watches] = { - println("buildRddWatches ---") + val train = preprocess(estimator, dataset) + val schema = train.schema - // TODO, check if the feature in featuresCols is numeric. + val indices = estimator.buildColumnIndices(schema) - val features = estimator.getFeaturesCols val maxBin = estimator.getMaxBins val nthread = estimator.getNthread - // TODO cast features to float if possible - - val label = estimator.getLabelCol - val missing = Float.NaN - - val train = preprocess(estimator, dataset) - val schema = train.schema - - val indices = ColumnIndices( - schema.fieldIndex(label), - features.map(schema.fieldIndex), - None, None, None - ) + val missing = estimator.getMissing + /** build QuantilDMatrix on the executor side */ def buildQuantileDMatrix(iter: Iterator[Table]): QuantileDMatrix = { val colBatchIter = iter.map { table => withResource(new GpuColumnBatch(table, null)) { batch => new CudfColumnBatch( - batch.slice(indices.featuresId.map(Integer.valueOf).asJava), + batch.slice(indices.featureIds.get.map(Integer.valueOf).asJava), batch.slice(indices.labelId), batch.slice(indices.weightId.getOrElse(-1)), batch.slice(indices.marginId.getOrElse(-1))); diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala index 1eb447f49d45..abf75588707a 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostEstimator.scala @@ -21,6 +21,7 @@ import java.util.ServiceLoader import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter +import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} import org.apache.commons.logging.LogFactory import org.apache.hadoop.fs.Path import org.apache.spark.ml.{Estimator, Model} @@ -33,21 +34,21 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions.col import org.apache.spark.sql.types.{ArrayType, FloatType, StructField, StructType} -import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint} import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, XGBoost => SXGBoost} import ml.dmlc.xgboost4j.scala.spark.Utils.MLVectorToXGBLabeledPoint import ml.dmlc.xgboost4j.scala.spark.params._ /** - * Hold the column indexes used to get the column index + * Hold the column index */ -private case class ColumnIndexes(label: String, - features: String, - weight: Option[String] = None, - baseMargin: Option[String] = None, - group: Option[String] = None, - valiation: Option[String] = None) +private[spark] case class ColumnIndices( + labelId: Int, + featureId: Option[Int], // the feature type is VectorUDT or Array + featureIds: Option[Seq[Int]], // the feature type is columnar + weightId: Option[Int], + marginId: Option[Int], + groupId: Option[Int]) private[spark] trait NonParamVariables[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]] { @@ -108,72 +109,101 @@ private[spark] abstract class XGBoostEstimator[ } } + /** + * Repartition the dataset to the numWorkers if needed. + * + * @param dataset to be repartition + * @return the repartitioned dataset + */ + private[spark] def repartitionIfNeeded(dataset: Dataset[_]): Dataset[_] = { + val numPartitions = dataset.rdd.getNumPartitions + if (getForceRepartition || getNumWorkers != numPartitions) { + dataset.repartition(getNumWorkers) + } else { + dataset + } + } + + /** + * Build the columns indices. + */ + private[spark] def buildColumnIndices(schema: StructType): ColumnIndices = { + + // Get feature id(s) + val (featureIds: Option[Seq[Int]], featureId: Option[Int]) = + if (getFeaturesCols.length != 0) { + (Some(getFeaturesCols.map(schema.fieldIndex).toSeq), None) + } else { + (None, Some(schema.fieldIndex(getFeaturesCol))) + } + + // function to get the column id according to the parameter + def columnId(param: Param[String]): Option[Int] = { + if (isDefined(param) && $(param).nonEmpty) { + Some(schema.fieldIndex($(param))) + } else { + None + } + } + + // Special handle for group + val groupId: Option[Int] = this match { + case p: HasGroupCol => columnId(p.groupCol) + case _ => None + } + + ColumnIndices( + labelId = columnId(labelCol).get, + featureId = featureId, + featureIds = featureIds, + columnId(weightCol), + columnId(baseMarginCol), + groupId) + } + /** * Preprocess the dataset to meet the xgboost input requirement * * @param dataset * @return */ - private def preprocess(dataset: Dataset[_]): (Dataset[_], ColumnIndexes) = { - // Columns to be selected for XGBoost - val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty + private def preprocess(dataset: Dataset[_]): (Dataset[_], ColumnIndices) = { - val trainingCols: ArrayBuffer[Param[String]] = ArrayBuffer.empty - trainingCols.append(labelCol, featuresCol, weightCol, baseMarginCol) - this match { - case p: HasGroupCol => - trainingCols.append(p.groupCol) - case _ => - } + // Columns to be selected for XGBoost training + val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty + val schema = dataset.schema - val Seq(labelName, featureName, weightName, baseMarginName) = - trainingCols.map { c => - if (isDefined(c) && $(c).nonEmpty) { - // Validation col should be a boolean column. - if (c == validationIndicatorCol || c == featuresCol) { - selectedCols.append(col($(c))) - } else { - selectedCols.append(castToFloatIfNeeded(dataset.schema, $(c))) - } - Some($(c)) + def selectCol(c: Param[String]) = { + if (isDefined(c) && $(c).nonEmpty) { + // Validation col should be a boolean column. + if (c == featuresCol) { + selectedCols.append(col($(c))) } else { - None + selectedCols.append(castToFloatIfNeeded(schema, $(c))) } } + } - var input = dataset.select(selectedCols: _*) - - // TODO, - // 1. add a parameter to force repartition, - // 2. follow xgboost pyspark way check if repartition is needed. - val numWorkers = getNumWorkers - val numPartitions = dataset.rdd.getNumPartitions - input = if (numWorkers == numPartitions) { - input - } else { - input.repartition(numWorkers) + Seq(labelCol, featuresCol, weightCol, baseMarginCol).foreach(selectCol) + this match { + case p: HasGroupCol => selectCol(p.groupCol) + case _ => } - val columnIndexes = ColumnIndexes( - labelName.get, - featureName.get, - weight = weightName, - baseMargin = baseMarginName, - group = None, - valiation = None) - (input, columnIndexes) + val input = repartitionIfNeeded(dataset.select(selectedCols: _*)) + + val columnIndices = buildColumnIndices(input.schema) + (input, columnIndices) } private def toXGBLabeledPoint(dataset: Dataset[_], - columnIndexes: ColumnIndexes): RDD[XGBLabeledPoint] = { + columnIndexes: ColumnIndices): RDD[XGBLabeledPoint] = { dataset.rdd.map { case row: Row => - val label = row.getFloat(row.fieldIndex(columnIndexes.label)) - val features = row.getAs[Vector](columnIndexes.features) - val weight = columnIndexes.weight.map(v => row.getFloat(row.fieldIndex(v))).getOrElse(1.0f) - val baseMargin = columnIndexes.baseMargin.map(v => - row.getFloat(row.fieldIndex(v))).getOrElse(Float.NaN) - val group = columnIndexes.group.map(v => - row.getFloat(row.fieldIndex(v))).getOrElse(-1.0f) + val label = row.getFloat(columnIndexes.labelId) + val features = row.getAs[Vector](columnIndexes.featureId.get) + val weight = columnIndexes.weightId.map(row.getFloat).getOrElse(1.0f) + val baseMargin = columnIndexes.marginId.map(row.getFloat).getOrElse(Float.NaN) + val group = columnIndexes.groupId.map(row.getFloat).getOrElse(-1.0f) // TODO support sparse vector. // TODO support array @@ -189,13 +219,13 @@ private[spark] abstract class XGBoostEstimator[ * @param columnsOrder the order of columns including weight/group/base margin ... * @return RDD */ - def toRdd(dataset: Dataset[_], columnIndexes: ColumnIndexes): RDD[Watches] = { - val trainRDD = toXGBLabeledPoint(dataset, columnIndexes) + def toRdd(dataset: Dataset[_], columnIndices: ColumnIndices): RDD[Watches] = { + val trainRDD = toXGBLabeledPoint(dataset, columnIndices) val x = getEvalDataset() getEvalDataset().map { eval => val (evalDf, _) = preprocess(eval) - val evalRDD = toXGBLabeledPoint(evalDf, columnIndexes) + val evalRDD = toXGBLabeledPoint(evalDf, columnIndices) trainRDD.zipPartitions(evalRDD) { (trainIter, evalIter) => val trainDMatrix = new DMatrix(trainIter) val evalDMatrix = new DMatrix(evalIter) @@ -360,17 +390,19 @@ private[spark] abstract class XGBoostModel[M <: XGBoostModel[M]]( // Be careful about the order of columns var schema = dataset.schema - var hasLeafPredictionCol = false - if (isDefined(leafPredictionCol) && getLeafPredictionCol.nonEmpty) { - schema = schema.add(StructField(getLeafPredictionCol, ArrayType(FloatType))) - hasLeafPredictionCol = true + /** If the parameter is defined, add it to schema and turn true */ + def addToSchema(param: Param[String], colName: Option[String] = None): Boolean = { + if (isDefined(param) && $(param).nonEmpty) { + val name = colName.getOrElse($(param)) + schema = schema.add(StructField(name, ArrayType(FloatType))) + true + } else { + false + } } - var hasContribPredictionCol = false - if (isDefined(contribPredictionCol) && getContribPredictionCol.nonEmpty) { - schema = schema.add(StructField(getContribPredictionCol, ArrayType(FloatType))) - hasContribPredictionCol = true - } + val hasLeafPredictionCol = addToSchema(leafPredictionCol) + val hasContribPredictionCol = addToSchema(contribPredictionCol) var hasRawPredictionCol = false // For classification case, the tranformed col is probability, @@ -378,16 +410,8 @@ private[spark] abstract class XGBoostModel[M <: XGBoostModel[M]]( var hasTransformedCol = false this match { case p: ClassificationParams[_] => // classification case - if (isDefined(p.rawPredictionCol) && p.getRawPredictionCol.nonEmpty) { - schema = schema.add( - StructField(p.getRawPredictionCol, ArrayType(FloatType))) - hasRawPredictionCol = true - } - if (isDefined(p.probabilityCol) && p.getProbabilityCol.nonEmpty) { - schema = schema.add( - StructField(TMP_TRANSFORMED_COL, ArrayType(FloatType))) - hasTransformedCol = true - } + hasRawPredictionCol = addToSchema(p.rawPredictionCol) + hasTransformedCol = addToSchema(p.probabilityCol, Some(TMP_TRANSFORMED_COL)) if (isDefined(predictionCol) && getPredictionCol.nonEmpty) { // Let's use transformed col to calculate the prediction @@ -400,11 +424,8 @@ private[spark] abstract class XGBoostModel[M <: XGBoostModel[M]]( } case _ => // Rename TMP_TRANSFORMED_COL to prediction in the postTransform. - if (isDefined(predictionCol) && getPredictionCol.nonEmpty) { - schema = schema.add( - StructField(TMP_TRANSFORMED_COL, ArrayType(FloatType))) - hasTransformedCol = true - } + hasTransformedCol = addToSchema(predictionCol, Some(TMP_TRANSFORMED_COL)) + } // TODO configurable diff --git a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala index 776ade43ffb0..f976cad937e5 100644 --- a/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala +++ b/jvm-packages/xgboost4j-spark/src/main/scala/ml/dmlc/xgboost4j/scala/spark/params/XGBoostParams.scala @@ -94,15 +94,6 @@ trait HasFeaturesCols extends Params { } } -trait HasValidationIndicatorCol extends Params { - - final val validationIndicatorCol: Param[String] = new Param[String](this, - "validationIndicatorCol", "Name of the column that indicates whether each row is for " + - "training or for validation. False indicates training; true indicates validation.") - - final def getValidationIndicatorCol: String = $(validationIndicatorCol) -} - /** * A trait to hold non-xgboost parameters */ @@ -124,7 +115,7 @@ trait NonXGBoostParams extends Params { */ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFeaturesCol with HasLabelCol with HasBaseMarginCol with HasWeightCol with HasPredictionCol - with HasLeafPredictionCol with HasContribPredictionCol with HasValidationIndicatorCol + with HasLeafPredictionCol with HasContribPredictionCol with RabitParams with NonXGBoostParams with SchemaValidationTrait { final val numWorkers = new IntParam(this, "numWorkers", "Number of workers used to train xgboost", @@ -132,6 +123,12 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe final def getNumRound: Int = $(numRound) + final val forceRepartition = new BooleanParam(this, "forceRepartition", "If the partition " + + "is equal to numWorkers, xgboost won't repartition the dataset. Set forceRepartition to " + + "true to force repartition.") + + final def getForceRepartition: Boolean = $(forceRepartition) + final val numRound = new IntParam(this, "numRound", "The number of rounds for boosting", ParamValidators.gtEq(1)) @@ -139,6 +136,8 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe "Number of rounds of decreasing eval metric to tolerate before stopping training", ParamValidators.gtEq(0)) + final def getNumEarlyStoppingRounds: Int = $(numEarlyStoppingRounds) + final val inferBatchSize = new IntParam(this, "inferBatchSize", "batch size in rows " + "to be grouped for inference", ParamValidators.gtEq(1)) @@ -146,19 +145,27 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe /** @group getParam */ final def getInferBatchSize: Int = $(inferBatchSize) - final def getNumEarlyStoppingRounds: Int = $(numEarlyStoppingRounds) + /** + * the value treated as missing. default: Float.NaN + */ + final val missing = new FloatParam(this, "missing", "The value treated as missing") + + final def getMissing: Float = $(missing) setDefault(numRound -> 100, numWorkers -> 1, inferBatchSize -> (32 << 10), - numEarlyStoppingRounds -> 0) + numEarlyStoppingRounds -> 0, forceRepartition -> false, missing -> Float.NaN, + featuresCols -> Array.empty) addNonXGBoostParam(numWorkers, numRound, numEarlyStoppingRounds, inferBatchSize, featuresCol, labelCol, baseMarginCol, weightCol, predictionCol, leafPredictionCol, contribPredictionCol, - validationIndicatorCol) + forceRepartition, missing, featuresCols) final def getNumWorkers: Int = $(numWorkers) def setNumWorkers(value: Int): T = set(numWorkers, value).asInstanceOf[T] + def setForceRepartition(value: Boolean): T = set(forceRepartition, value).asInstanceOf[T] + def setNumRound(value: Int): T = set(numRound, value).asInstanceOf[T] def setFeaturesCol(value: String): T = set(featuresCol, value).asInstanceOf[T] @@ -179,9 +186,6 @@ private[spark] trait SparkParams[T <: Params] extends HasFeaturesCols with HasFe def setInferBatchSize(value: Int): T = set(inferBatchSize, value).asInstanceOf[T] - def setValidationIndicatorCol(value: String): T = - set(validationIndicatorCol, value).asInstanceOf[T] - def setRabitTrackerTimeout(value: Int): T = set(rabitTrackerTimeout, value).asInstanceOf[T] def setRabitTrackerHostIp(value: String): T = set(rabitTrackerHostIp, value).asInstanceOf[T] @@ -210,9 +214,11 @@ private[spark] trait ClassificationParams[T <: Params] extends SparkParams[T] def setThresholds(value: Array[Double]): T = set(thresholds, value).asInstanceOf[T] + /** + * XGBoost doesn't use validateAndTransformSchema. + */ override def validateAndTransformSchema(schema: StructType, fitting: Boolean): StructType = { - var outputSchema = SparkUtils.appendColumn(schema, $(predictionCol), DoubleType) outputSchema = SparkUtils.appendVectorUDTColumn(outputSchema, $(rawPredictionCol)) outputSchema = SparkUtils.appendVectorUDTColumn(outputSchema, $(probabilityCol))