Skip to content

Commit

Permalink
sort partition for gpu
Browse files Browse the repository at this point in the history
  • Loading branch information
wbo4958 committed Sep 18, 2024
1 parent eeca573 commit 1c8c3b9
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ class GpuXGBoostPlugin extends XGBoostPlugin {
selectedCols.append(col)
}
val input = dataset.select(selectedCols.toArray: _*)
estimator.repartitionIfNeeded(input)
val repartitioned = estimator.repartitionIfNeeded(input)
estimator.sortPartitionIfNeeded(repartitioned)
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package ml.dmlc.xgboost4j.scala.spark

import ai.rapids.cudf.Table
import ai.rapids.cudf.{OrderByArg, Table}
import ml.dmlc.xgboost4j.java.CudfColumnBatch
import ml.dmlc.xgboost4j.scala.{DMatrix, QuantileDMatrix, XGBoost => ScalaXGBoost}
import ml.dmlc.xgboost4j.scala.rapids.spark.GpuTestSuite
import ml.dmlc.xgboost4j.scala.rapids.spark.SparkSessionHolder.withSparkSession
import ml.dmlc.xgboost4j.scala.spark.Utils.withResource
import org.apache.spark.ml.linalg.DenseVector
import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.SparkConf

import java.io.File
Expand Down Expand Up @@ -94,7 +94,9 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
}

// spark.rapids.sql.enabled is not set explicitly, default to true
withSparkSession(new SparkConf(), spark => {checkIsEnabled(spark, true)})
withSparkSession(new SparkConf(), spark => {
checkIsEnabled(spark, true)
})

// set spark.rapids.sql.enabled to false
withCpuSparkSession() { spark =>
Expand Down Expand Up @@ -503,7 +505,42 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
}
}

test("Ranker: XGBoost-Spark should match xgboost4j") {
test("The group col should be sorted in each partition") {
withGpuSparkSession() { spark =>
import spark.implicits._
val df = Ranking.train.toDF("label", "weight", "group", "c1", "c2", "c3")

val xgboostParams: Map[String, Any] = Map(
"device" -> "cuda",
"objective" -> "rank:ndcg"
)
val features = Array("c1", "c2", "c3")
val label = "label"
val group = "group"

val ranker = new XGBoostRanker(xgboostParams)
.setFeaturesCol(features)
.setLabelCol(label)
.setNumWorkers(1)
.setNumRound(1)
.setGroupCol(group)
.setDevice("cuda")

val processedDf = ranker.getPlugin.get.asInstanceOf[GpuXGBoostPlugin].preprocess(ranker, df)
processedDf.rdd.foreachPartition { iter => {
var prevGroup = Int.MinValue
while (iter.hasNext) {
val curr = iter.next()
val group = curr.asInstanceOf[Row].getAs[Int](1)
assert(prevGroup <= group)
prevGroup = group
}
}
}
}
}

test("Ranker: XGBoost-Spark should match xgboost4j") {
withGpuSparkSession() { spark =>
import spark.implicits._

Expand Down Expand Up @@ -534,7 +571,8 @@ class GpuXGBoostPluginSuite extends GpuTestSuite {
.setDevice("cuda")

val xgb4jModel = withResource(new GpuColumnBatch(
Table.readParquet(new File(trainPath)))) { batch =>
Table.readParquet(new File(trainPath)
).orderBy(OrderByArg.asc(df.schema.fieldIndex(group))))) { batch =>
val cb = new CudfColumnBatch(batch.select(featuresIndices),
batch.select(df.schema.fieldIndex(label)), null, null,
batch.select(df.schema.fieldIndex(group)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ object Regression extends TrainTestData {
}

object Ranking extends TrainTestData {
val train = generateRankDataset(300, 10, 555)
val test = generateRankDataset(150, 10, 556)
val train = generateRankDataset(300, 10, 12, 555)
val test = generateRankDataset(150, 10, 12, 556)
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,15 @@ private[spark] trait XGBoostEstimator[
}
}

/**
* Sort partition for Ranker issue.
* @param dataset
* @return
*/
private[spark] def sortPartitionIfNeeded(dataset: Dataset[_]): Dataset[_] = {
dataset
}

/**
* Build the columns indices.
*/
Expand Down Expand Up @@ -198,10 +207,10 @@ private[spark] trait XGBoostEstimator[
case p: HasGroupCol => selectCol(p.groupCol, IntegerType)
case _ =>
}
val input = repartitionIfNeeded(dataset.select(selectedCols.toArray: _*))

val columnIndices = buildColumnIndices(input.schema)
(input, columnIndices)
val repartitioned = repartitionIfNeeded(dataset.select(selectedCols.toArray: _*))
val sorted = sortPartitionIfNeeded(repartitioned)
val columnIndices = buildColumnIndices(sorted.schema)
(sorted, columnIndices)
}

/** visible for testing */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,13 @@ class XGBoostRanker(override val uid: String,
}

/**
* Preprocess the dataset to meet the xgboost input requirement
* Sort partition for Ranker issue.
*
* @param dataset
* @return
*/
override private[spark] def preprocess(dataset: Dataset[_]): (Dataset[_], ColumnIndices) = {
val (output, columnIndices) = super.preprocess(dataset)
(output.sortWithinPartitions(getGroupCol), columnIndices)
override private[spark] def sortPartitionIfNeeded(dataset: Dataset[_]) = {
dataset.sortWithinPartitions(getGroupCol)
}

override protected def createModel(
Expand Down

0 comments on commit 1c8c3b9

Please sign in to comment.