Skip to content

Commit

Permalink
Merge pull request #605 from talalryz/fix_xgboost_sparse_vector_support
Browse files Browse the repository at this point in the history
Fix xgboost sparse vector support
  • Loading branch information
lucagiovagnoli authored Jan 14, 2020
2 parents beb4fd2 + dc0464d commit af92144
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ case class XGBoostBinaryClassificationModel(override val booster: Booster,
override val numClasses: Int = 2

def predict(data: DMatrix): Double = {
Math.round(booster.predict(data).head(0))
Math.round(booster.predict(data, outPutMargin = false, treeLimit = treeLimit).head(0))
}

def predictProbabilities(data: DMatrix): Vector = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
package ml.combust.mleap.xgboost.runtime

import ml.combust.mleap.tensor.Tensor
import ml.combust.mleap.tensor.{DenseTensor, SparseTensor, Tensor}
import ml.dmlc.xgboost4j.LabeledPoint
import ml.dmlc.xgboost4j.scala.DMatrix
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector}

trait XgbConverters {
implicit class VectorOps(vector: Vector) {
def asXGB: DMatrix = {
new DMatrix(Iterator(new LabeledPoint(0.0f, null, vector.toDense.values.map(_.toFloat))))
vector match {
case SparseVector(_, indices, values) =>{
new DMatrix(Iterator(new LabeledPoint(0.0f, indices, values.map(_.toFloat))))
}
case DenseVector(values) =>
new DMatrix(Iterator(new LabeledPoint(0.0f, null, values.map(_.toFloat))))
}
}
}

implicit class DoubleTensorOps(tensor: Tensor[Double]) {
def asXGB: DMatrix = {
new DMatrix(Iterator(new LabeledPoint(0.0f, null, tensor.toDense.rawValues.map(_.toFloat))))
tensor match {
case SparseTensor(indices, values, _) =>{
new DMatrix(Iterator(new LabeledPoint(0.0f, indices.map(_.head).toArray, values.map(_.toFloat))))
}
case DenseTensor(_, _) =>{
new DMatrix(Iterator(new LabeledPoint(0.0f, null, tensor.toDense.rawValues.map(_.toFloat))))
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -986,15 +986,15 @@
21.04,65.46,1017.22,63.02,452.12
14.05,40.69,1015.66,74.39,470.9
23.48,64.15,1021.08,57.77,450.89
21.91,63.76,1009.85,76.8,445.04
24.42,63.07,1011.49,67.39,444.72
14.26,40.92,1022.07,73.96,460.38
21.38,58.33,1013.05,72.75,446.8
15.71,44.06,1018.34,71.69,465.05
5.78,40.62,1016.55,84.98,484.13
6.77,39.81,1017.01,87.68,488.27
23.84,49.21,1013.85,50.36,447.09
21.17,58.16,1017.16,68.11,452.02
19.94,58.96,1014.16,66.27,455.55
8.73,41.92,1029.41,89.72,480.99
21.91,0,0,0,445.04
0,0,0,67.39,444.72
0,0,0,73.96,460.38
0,58.33,0,0,446.8
15.71,0,0,0,465.05
5.78,0,0,0,484.13
0,0,0,87.68,488.27
23.84,0,0,0,447.09
0,0,1017.16,0,452.02
0,0,0,66.27,455.55
8.73,0,0,0,480.99
16.39,41.67,1012.96,61.07,467.68
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import scala.collection.mutable
/**
* Created by hollinwilkins on 9/16/17.
*/
case class PowerPlantTableForClassifier(AT: Double, V : Double, AP : Double, RH : Double, PE : Int)

class XGBoostClassificationModelParitySpec extends FunSpec
with BeforeAndAfterAll {

Expand All @@ -46,32 +48,64 @@ class XGBoostClassificationModelParitySpec extends FunSpec
"nworkers" -> 2
)

val dataset: DataFrame = {
private val denseDataset: DataFrame = {
SparkParityBase.dataset(spark).select("fico_score_group_fnl", "dti").
filter(col("fico_score_group_fnl") === "500 - 550" ||
col("fico_score_group_fnl") === "600 - 650")
}

val sparkTransformer: Transformer = {
val featurePipeline = new Pipeline().setStages(Array(new StringIndexer().
private val sparseDataset: DataFrame = {
import spark.sqlContext.implicits._

spark.sqlContext.sparkContext.textFile(this.getClass.getClassLoader.getResource("datasources/xgboost_training.csv").toString)
.map(x => x.split(","))
.map(line => PowerPlantTableForClassifier(line(0).toDouble, line(1).toDouble, line(2).toDouble, line(3).toDouble, line(4).toDouble.toInt % 2))
.toDF
}

private val featurePipelineForDenseDataset: Transformer = {
new Pipeline().setStages(Array(new StringIndexer().
setInputCol("fico_score_group_fnl").
setOutputCol("fico_index"),
new VectorAssembler().
setInputCols(Array("dti")).
setOutputCol("features"))).fit(dataset)
setOutputCol("features"))).fit(denseDataset)
}

private val sparkTransformerForSparseDataset: Transformer = {
val featureAssembler = new VectorAssembler()
.setInputCols(Array("AT", "V", "AP", "RH"))
.setOutputCol("features")

val params = xgboostParams + ("missing"-> 0.0f)
val classifier = createClassifier(params, featureAssembler, sparseDataset, "PE")
SparkUtil.createPipelineModel(Array(featureAssembler, classifier))
}

private val sparkTransformerForDenseDataset: Transformer = {

val classifier = createClassifier(xgboostParams, featurePipelineForDenseDataset, denseDataset, "fico_index")
SparkUtil.createPipelineModel(Array(featurePipelineForDenseDataset, classifier))
}

val classifier = new XGBoostClassifier(xgboostParams).
def createClassifier(
xgboostParams: Map[String, Any],
featurePipeline: Transformer,
dataset: DataFrame,
outputCol: String
): Transformer ={
new XGBoostClassifier(xgboostParams).
setFeaturesCol("features").
setProbabilityCol("probabilities").
setLabelCol("fico_index").
setLabelCol(outputCol).
fit(featurePipeline.transform(dataset)).
setLeafPredictionCol("leaf_prediction").
setContribPredictionCol("contrib_prediction").
setTreeLimit(2)

SparkUtil.createPipelineModel(Array(featurePipeline, classifier))
}



def equalityTest(sparkDataset: DataFrame,
mleapDataset: DefaultLeapFrame): Unit = {
val sparkFeaturesCol = sparkDataset.schema.fieldIndex("features")
Expand All @@ -93,7 +127,7 @@ class XGBoostClassificationModelParitySpec extends FunSpec

sparkDataset.collect().zip(mleapDataset.collect()).foreach {
case (sp, ml) =>
assert(sp.getAs[Vector](sparkFeaturesCol).toDense.values sameElements ml.getTensor[Double](mleapFeaturesCol).rawValues)
assert(sp.getAs[Vector](sparkFeaturesCol).toDense.values sameElements ml.getTensor[Double](mleapFeaturesCol).toDense.rawValues)

val sparkProbabilities = sp.getAs[Vector](sparkProbabilityCol).toArray
val mleapProbabilities = ml.getTensor[Double](mleapProbabilityCol).toArray
Expand All @@ -104,9 +138,9 @@ class XGBoostClassificationModelParitySpec extends FunSpec
println("SPARK: " + sparkProbabilities.mkString(","))
println("MLEAP: " + mleapProbabilities.mkString(","))
}

assert(Math.abs(v2 - v1) < 0.0000001)
}

val sparkPrediction = sp.getDouble(sparkPredictionCol)
val mleapPrediction = ml.getDouble(mleapPredictionCol)
assert(sparkPrediction == mleapPrediction)
Expand All @@ -121,47 +155,55 @@ class XGBoostClassificationModelParitySpec extends FunSpec
}
}

var bundleCache: Option[File] = None
var bundleCacheSparse : Option[File] = None
var bundleCacheDense : Option[File] = None

def serializedModel(transformer: Transformer): File = {
def serializeModelToMleapBundle(transformer: Transformer, dataset: DataFrame): File = {
import ml.combust.mleap.spark.SparkSupport._

implicit val sbc = SparkBundleContext.defaultContext.withDataset(transformer.transform(dataset))

bundleCache.getOrElse {
new File("/tmp/mleap/spark-parity").mkdirs()
val file = new File(s"/tmp/mleap/spark-parity/${classOf[XGBoostRegressionModelParitySpec].getName}.zip")
file.delete()
new File("/tmp/mleap/spark-parity").mkdirs()
val file = new File(s"/tmp/mleap/spark-parity/${classOf[XGBoostClassificationModelParitySpec].getName}.zip")
file.delete()

for(bf <- managed(BundleFile(file))) {
transformer.writeBundle.format(SerializationFormat.Json).save(bf).get
}

bundleCache = Some(file)
file
for(bf <- managed(BundleFile(file))) {
transformer.writeBundle.format(SerializationFormat.Json).save(bf).get
}
file
}

def mleapTransformer(transformer: Transformer)
(implicit context: SparkBundleContext): frame.Transformer = {
def loadMleapTransformerFromBundle(bundleFile: File)
(implicit context: SparkBundleContext): frame.Transformer = {
import ml.combust.mleap.runtime.MleapSupport._

(for(bf <- managed(BundleFile(serializedModel(transformer)))) yield {
(for(bf <- managed(BundleFile(bundleFile))) yield {
bf.loadMleapBundle().get.root
}).tried.get
}

it("produces the same results") {
val sparkDataset = sparkTransformer.transform(dataset)
val mleapSchema = TypeConverters.sparkSchemaToMleapSchema(dataset)

val data = dataset.collect().map {
def constructLeapFrameFromSparkDataFrame(dataFrame: DataFrame): DefaultLeapFrame ={
val mleapSchema = TypeConverters.sparkSchemaToMleapSchema(dataFrame)
val data = dataFrame.collect().map {
r => Row(r.toSeq: _*)
}
val frame = DefaultLeapFrame(mleapSchema, data)
val mleapT = mleapTransformer(sparkTransformer)
val mleapDataset = mleapT.transform(frame).get
DefaultLeapFrame(mleapSchema, data)
}

def doTest(sparkTransformer: Transformer, dataset: DataFrame, bundleCache: Option[File]): Unit ={
val sparkDataset = sparkTransformer.transform(dataset)
val leapFrame = constructLeapFrameFromSparkDataFrame(dataset)
val mleapBundle = bundleCache.getOrElse(serializeModelToMleapBundle(sparkTransformer, dataset))
val mleapTransformer = loadMleapTransformerFromBundle(mleapBundle)
val mleapDataset = mleapTransformer.transform(leapFrame).get

equalityTest(sparkDataset, mleapDataset)
}

it("produces the same results for a dense dataset") {
doTest(sparkTransformerForDenseDataset, denseDataset, bundleCacheDense)
}

it("produces the same result for a sparse dataset") {
doTest(sparkTransformerForSparseDataset, sparseDataset, bundleCacheSparse)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class XGBoostRegressionModelParitySpec extends FunSpec
private val xgboostParams: Map[String, Any] = Map(
"eta" -> 0.3,
"max_depth" -> 2,
"missing" -> 0.0f,
"objective" -> "reg:squarederror",
"early_stopping_rounds" ->2,
"num_round" -> 15
Expand Down Expand Up @@ -82,7 +83,7 @@ class XGBoostRegressionModelParitySpec extends FunSpec
val v1 = sp.getDouble(sparkPredictionCol)
val v2 = ml.getDouble(mleapPredictionCol)

assert(sp.getAs[Vector](sparkFeaturesCol).toDense.values sameElements ml.getTensor[Double](mleapFeaturesCol).rawValues)
assert(sp.getAs[Vector](sparkFeaturesCol).toDense.values sameElements ml.getTensor[Double](mleapFeaturesCol).toDense.rawValues)
assert(Math.abs(v2 - v1) < 0.0001)
}
}
Expand Down

0 comments on commit af92144

Please sign in to comment.