Skip to content

Commit

Permalink
udpate
Browse files Browse the repository at this point in the history
  • Loading branch information
wbo4958 committed Jun 19, 2024
1 parent 9d188b2 commit 68948c2
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,6 @@ import org.apache.spark.sql.{Column, Dataset}
import ml.dmlc.xgboost4j.java.{CudfColumnBatch, GpuColumnBatch}
import ml.dmlc.xgboost4j.scala.QuantileDMatrix

private[spark] case class ColumnIndices(
labelId: Int,
featuresId: Seq[Int],
weightId: Option[Int],
marginId: Option[Int],
groupId: Option[Int])

class GPUXGBoostPlugin extends XGBoostPlugin {

/**
Expand All @@ -51,23 +44,24 @@ class GPUXGBoostPlugin extends XGBoostPlugin {
hasRapidsPlugin && rapidsEnabled
}

// TODO, support numeric type
private def preprocess[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M], dataset: Dataset[_]): Dataset[_] = {

val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty

val features = estimator.getFeaturesCols

(features.toSeq ++ Seq(estimator.getLabelCol)).foreach { name =>
val col = estimator.castToFloatIfNeeded(dataset.schema, name)
selectedCols.append(col)
}

val input = dataset.select(selectedCols: _*)

// TODO add repartition
input.repartition(estimator.getNumWorkers)
estimator.repartitionIfNeeded(input)
}


/**
* Convert Dataset to RDD[Watches] which will be fed into XGBoost
*
Expand All @@ -78,32 +72,21 @@ class GPUXGBoostPlugin extends XGBoostPlugin {
override def buildRddWatches[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]](
estimator: XGBoostEstimator[T, M],
dataset: Dataset[_]): RDD[Watches] = {
println("buildRddWatches ---")
val train = preprocess(estimator, dataset)
val schema = train.schema

// TODO, check if the feature in featuresCols is numeric.
val indices = estimator.buildColumnIndices(schema)

val features = estimator.getFeaturesCols
val maxBin = estimator.getMaxBins
val nthread = estimator.getNthread
// TODO cast features to float if possible

val label = estimator.getLabelCol
val missing = Float.NaN

val train = preprocess(estimator, dataset)
val schema = train.schema

val indices = ColumnIndices(
schema.fieldIndex(label),
features.map(schema.fieldIndex),
None, None, None
)
val missing = estimator.getMissing

/** build QuantilDMatrix on the executor side */
def buildQuantileDMatrix(iter: Iterator[Table]): QuantileDMatrix = {
val colBatchIter = iter.map { table =>
withResource(new GpuColumnBatch(table, null)) { batch =>
new CudfColumnBatch(
batch.slice(indices.featuresId.map(Integer.valueOf).asJava),
batch.slice(indices.featureIds.get.map(Integer.valueOf).asJava),
batch.slice(indices.labelId),
batch.slice(indices.weightId.getOrElse(-1)),
batch.slice(indices.marginId.getOrElse(-1)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.ServiceLoader
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters.iterableAsScalaIterableConverter

import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.fs.Path
import org.apache.spark.ml.{Estimator, Model}
Expand All @@ -33,21 +34,21 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.types.{ArrayType, FloatType, StructField, StructType}

import ml.dmlc.xgboost4j.{LabeledPoint => XGBLabeledPoint}
import ml.dmlc.xgboost4j.scala.{Booster, DMatrix, XGBoost => SXGBoost}
import ml.dmlc.xgboost4j.scala.spark.Utils.MLVectorToXGBLabeledPoint
import ml.dmlc.xgboost4j.scala.spark.params._


/**
* Hold the column indexes used to get the column index
* Hold the column index
*/
private case class ColumnIndexes(label: String,
features: String,
weight: Option[String] = None,
baseMargin: Option[String] = None,
group: Option[String] = None,
valiation: Option[String] = None)
private[spark] case class ColumnIndices(
labelId: Int,
featureId: Option[Int], // the feature type is VectorUDT or Array
featureIds: Option[Seq[Int]], // the feature type is columnar
weightId: Option[Int],
marginId: Option[Int],
groupId: Option[Int])

private[spark] trait NonParamVariables[T <: XGBoostEstimator[T, M], M <: XGBoostModel[M]] {

Expand Down Expand Up @@ -108,72 +109,101 @@ private[spark] abstract class XGBoostEstimator[
}
}

/**
* Repartition the dataset to the numWorkers if needed.
*
* @param dataset to be repartition
* @return the repartitioned dataset
*/
private[spark] def repartitionIfNeeded(dataset: Dataset[_]): Dataset[_] = {
val numPartitions = dataset.rdd.getNumPartitions
if (getForceRepartition || getNumWorkers != numPartitions) {
dataset.repartition(getNumWorkers)
} else {
dataset
}
}

/**
* Build the columns indices.
*/
private[spark] def buildColumnIndices(schema: StructType): ColumnIndices = {

// Get feature id(s)
val (featureIds: Option[Seq[Int]], featureId: Option[Int]) =
if (getFeaturesCols.length != 0) {
(Some(getFeaturesCols.map(schema.fieldIndex).toSeq), None)
} else {
(None, Some(schema.fieldIndex(getFeaturesCol)))
}

// function to get the column id according to the parameter
def columnId(param: Param[String]): Option[Int] = {
if (isDefined(param) && $(param).nonEmpty) {
Some(schema.fieldIndex($(param)))
} else {
None
}
}

// Special handle for group
val groupId: Option[Int] = this match {
case p: HasGroupCol => columnId(p.groupCol)
case _ => None
}

ColumnIndices(
labelId = columnId(labelCol).get,
featureId = featureId,
featureIds = featureIds,
columnId(weightCol),
columnId(baseMarginCol),
groupId)
}

/**
* Preprocess the dataset to meet the xgboost input requirement
*
* @param dataset
* @return
*/
private def preprocess(dataset: Dataset[_]): (Dataset[_], ColumnIndexes) = {
// Columns to be selected for XGBoost
val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty
private def preprocess(dataset: Dataset[_]): (Dataset[_], ColumnIndices) = {

val trainingCols: ArrayBuffer[Param[String]] = ArrayBuffer.empty
trainingCols.append(labelCol, featuresCol, weightCol, baseMarginCol)
this match {
case p: HasGroupCol =>
trainingCols.append(p.groupCol)
case _ =>
}
// Columns to be selected for XGBoost training
val selectedCols: ArrayBuffer[Column] = ArrayBuffer.empty
val schema = dataset.schema

val Seq(labelName, featureName, weightName, baseMarginName) =
trainingCols.map { c =>
if (isDefined(c) && $(c).nonEmpty) {
// Validation col should be a boolean column.
if (c == validationIndicatorCol || c == featuresCol) {
selectedCols.append(col($(c)))
} else {
selectedCols.append(castToFloatIfNeeded(dataset.schema, $(c)))
}
Some($(c))
def selectCol(c: Param[String]) = {
if (isDefined(c) && $(c).nonEmpty) {
// Validation col should be a boolean column.
if (c == featuresCol) {
selectedCols.append(col($(c)))
} else {
None
selectedCols.append(castToFloatIfNeeded(schema, $(c)))
}
}
}

var input = dataset.select(selectedCols: _*)

// TODO,
// 1. add a parameter to force repartition,
// 2. follow xgboost pyspark way check if repartition is needed.
val numWorkers = getNumWorkers
val numPartitions = dataset.rdd.getNumPartitions
input = if (numWorkers == numPartitions) {
input
} else {
input.repartition(numWorkers)
Seq(labelCol, featuresCol, weightCol, baseMarginCol).foreach(selectCol)
this match {
case p: HasGroupCol => selectCol(p.groupCol)
case _ =>
}
val columnIndexes = ColumnIndexes(
labelName.get,
featureName.get,
weight = weightName,
baseMargin = baseMarginName,
group = None,
valiation = None)
(input, columnIndexes)
val input = repartitionIfNeeded(dataset.select(selectedCols: _*))

val columnIndices = buildColumnIndices(input.schema)
(input, columnIndices)
}

private def toXGBLabeledPoint(dataset: Dataset[_],
columnIndexes: ColumnIndexes): RDD[XGBLabeledPoint] = {
columnIndexes: ColumnIndices): RDD[XGBLabeledPoint] = {
dataset.rdd.map {
case row: Row =>
val label = row.getFloat(row.fieldIndex(columnIndexes.label))
val features = row.getAs[Vector](columnIndexes.features)
val weight = columnIndexes.weight.map(v => row.getFloat(row.fieldIndex(v))).getOrElse(1.0f)
val baseMargin = columnIndexes.baseMargin.map(v =>
row.getFloat(row.fieldIndex(v))).getOrElse(Float.NaN)
val group = columnIndexes.group.map(v =>
row.getFloat(row.fieldIndex(v))).getOrElse(-1.0f)
val label = row.getFloat(columnIndexes.labelId)
val features = row.getAs[Vector](columnIndexes.featureId.get)
val weight = columnIndexes.weightId.map(row.getFloat).getOrElse(1.0f)
val baseMargin = columnIndexes.marginId.map(row.getFloat).getOrElse(Float.NaN)
val group = columnIndexes.groupId.map(row.getFloat).getOrElse(-1.0f)

// TODO support sparse vector.
// TODO support array
Expand All @@ -189,13 +219,13 @@ private[spark] abstract class XGBoostEstimator[
* @param columnsOrder the order of columns including weight/group/base margin ...
* @return RDD
*/
def toRdd(dataset: Dataset[_], columnIndexes: ColumnIndexes): RDD[Watches] = {
val trainRDD = toXGBLabeledPoint(dataset, columnIndexes)
def toRdd(dataset: Dataset[_], columnIndices: ColumnIndices): RDD[Watches] = {
val trainRDD = toXGBLabeledPoint(dataset, columnIndices)

val x = getEvalDataset()
getEvalDataset().map { eval =>
val (evalDf, _) = preprocess(eval)
val evalRDD = toXGBLabeledPoint(evalDf, columnIndexes)
val evalRDD = toXGBLabeledPoint(evalDf, columnIndices)
trainRDD.zipPartitions(evalRDD) { (trainIter, evalIter) =>
val trainDMatrix = new DMatrix(trainIter)
val evalDMatrix = new DMatrix(evalIter)
Expand Down Expand Up @@ -360,34 +390,28 @@ private[spark] abstract class XGBoostModel[M <: XGBoostModel[M]](
// Be careful about the order of columns
var schema = dataset.schema

var hasLeafPredictionCol = false
if (isDefined(leafPredictionCol) && getLeafPredictionCol.nonEmpty) {
schema = schema.add(StructField(getLeafPredictionCol, ArrayType(FloatType)))
hasLeafPredictionCol = true
/** If the parameter is defined, add it to schema and turn true */
def addToSchema(param: Param[String], colName: Option[String] = None): Boolean = {
if (isDefined(param) && $(param).nonEmpty) {
val name = colName.getOrElse($(param))
schema = schema.add(StructField(name, ArrayType(FloatType)))
true
} else {
false
}
}

var hasContribPredictionCol = false
if (isDefined(contribPredictionCol) && getContribPredictionCol.nonEmpty) {
schema = schema.add(StructField(getContribPredictionCol, ArrayType(FloatType)))
hasContribPredictionCol = true
}
val hasLeafPredictionCol = addToSchema(leafPredictionCol)
val hasContribPredictionCol = addToSchema(contribPredictionCol)

var hasRawPredictionCol = false
// For classification case, the tranformed col is probability,
// while for others, it's the prediction value.
var hasTransformedCol = false
this match {
case p: ClassificationParams[_] => // classification case
if (isDefined(p.rawPredictionCol) && p.getRawPredictionCol.nonEmpty) {
schema = schema.add(
StructField(p.getRawPredictionCol, ArrayType(FloatType)))
hasRawPredictionCol = true
}
if (isDefined(p.probabilityCol) && p.getProbabilityCol.nonEmpty) {
schema = schema.add(
StructField(TMP_TRANSFORMED_COL, ArrayType(FloatType)))
hasTransformedCol = true
}
hasRawPredictionCol = addToSchema(p.rawPredictionCol)
hasTransformedCol = addToSchema(p.probabilityCol, Some(TMP_TRANSFORMED_COL))

if (isDefined(predictionCol) && getPredictionCol.nonEmpty) {
// Let's use transformed col to calculate the prediction
Expand All @@ -400,11 +424,8 @@ private[spark] abstract class XGBoostModel[M <: XGBoostModel[M]](
}
case _ =>
// Rename TMP_TRANSFORMED_COL to prediction in the postTransform.
if (isDefined(predictionCol) && getPredictionCol.nonEmpty) {
schema = schema.add(
StructField(TMP_TRANSFORMED_COL, ArrayType(FloatType)))
hasTransformedCol = true
}
hasTransformedCol = addToSchema(predictionCol, Some(TMP_TRANSFORMED_COL))

}

// TODO configurable
Expand Down
Loading

0 comments on commit 68948c2

Please sign in to comment.