Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MLeap Serialized Pipeline including a XGBoost Model does not predict same values as Spark Pipeline #625

Closed
irene3030 opened this issue Jan 14, 2020 · 5 comments

Comments

@irene3030
Copy link

irene3030 commented Jan 14, 2020

Hello,

I am currently working in a project where a machine learning model has been created using Apache Spark & XGBoost4J. In order to deploy this model in a productive environment, I've used MLeap and its extension for XGBoost to serialize my pipeline, which include the following modules: StringIndexer, OneHotEncoderEstimator, VectorAssembler and a XGBoost regression model.

When reading the MLeap Bundle object I find that the predictions obtained using the serialized XGBoost model included in this object are very different than the ones obtained using the model XGBoost directly with Spark & XGboost4J-Spark.

Here is how I create my pipeline, train the model and wrap it in a MLeap object:

def createFeaturePipeline(categoricalColumns: Array[String], numericalColumns: Array[String]): Pipeline = {
    val encodedCategoricalFeatures = categoricalColumns.flatMap{
      feature =>
        val indexer = new StringIndexer()
          .setInputCol(feature)
          .setOutputCol(feature + "_INDEX")
          .setHandleInvalid("keep")

        val oneHotEncoder = new OneHotEncoderEstimator()
          .setInputCols(Array(feature + "_INDEX"))
          .setOutputCols(Array(feature + "_VEC"))
          .setDropLast(false)

      Array(indexer, oneHotEncoder)
    }
    val vecCategoricalFeatures = categoricalColumns.map(e => e.concat("_VEC"))
    val vectorAssembler = new VectorAssembler()
      .setInputCols(vecCategoricalFeatures ++ numericalColumns)
      .setOutputCol(PortatilesModelConstants.featuresName)
    val featurePipeline: Pipeline = new Pipeline().setStages(encodedCategoricalFeatures ++ Array(vectorAssembler))
    featurePipeline
  }

  def createModel(): XGBoostRegressor ={
    val xgbParam = Map(
      "eta" -> 0.1,
      "verbosity" -> 3,
      "missing" -> 0,
      "num_workers" -> 1,
      "num_round" -> 200,
      "nthread" -> 1,
      "alpha" -> 0.0,
      "gamma" -> 0.6,
      "lambda" -> 0.4,
      "maxDepth" -> 7,
      "minChildWeight" -> 5.0,
      "subsample" -> 1.0
    )

    val xgbRegressor: XGBoostRegressor = new XGBoostRegressor(xgbParam)
      .setFeaturesCol(PortatilesModelConstants.featuresName)
      .setLabelCol(PortatilesModelConstants.targetFeature)
    xgbRegressor
  }

  def buildEvaluator(metric: String = "rmse"): RegressionEvaluator ={
     val evaluator = new RegressionEvaluator()
       .setLabelCol(PortatilesModelConstants.targetFeature)
       .setPredictionCol("prediction").setMetricName(metric)
      evaluator
   }

  def savePipelineToBundle(data: DataFrame, pipelineToSave: PipelineModel, path: String): Unit = {
    import ml.combust.mleap.spark.SparkSupport._
    implicit val sbc: SparkBundleContext = SparkBundleContext().withDataset(data)
    new File(path).delete()
    for (bf <- managed(BundleFile("jar:file:" + path))) {
      pipelineToSave.writeBundle.save(bf).get
    }
  }

  // Main flow
    val Array(split20, split80) = loadData(pathData).randomSplit(Array(0.20, 0.80))
    val testSet = split20.cache()
    val trainingSet = split80.cache()
    val categoricalColumns = PortatilesModelConstants.categoricalColumns
    val numericalColumns = PortatilesModelConstants.numericalColumns
    val featurePipelineFitted = createFeaturePipeline(categoricalColumns, numericalColumns).fit(trainingSet)
    val trainingSetTransformed = featurePipelineFitted.transform(trainingSet)
    val xgbModel = createModel().fit(trainingSetTransformed)
    val trainingSetPredicted = xgbModel.transform(trainingSetTransformed)
    val pipelineModel = SparkUtil.createPipelineModel(Array(featurePipelineFitted, xgbModel))

  // Test performance of the model
   val testSetTransformed = pipelineModel.transform(testSet)
   val mae = buildEvaluator("mae").evaluate(testSetTransformed)

  // Save pipeline
    savePipelineToBundle(pipelineModel.transform(trainingSet), pipelineModel, "/tmp/pipelineModel.zip")

(Just in case it is not clear, PortatilesModelConstants contains constants such as the name of the columns I am working with).
And here you may find how I reading the MLeap object and testing the pipeline using the testSet. First I obtain my test set transformed through the serialized pipeline. Then I transform it back to Spark DataFrame and compute "MAE" metric :

val bundle = (for(bf <- managed(BundleFile("jar:file:/tmp/pipelineModel.zip"))) yield {
  bf.loadMleapBundle().get.root
}).tried.get

val rows = testSet.toSparkLeapFrame.dataset
val schema = testSet.toSparkLeapFrame.schema
val leapFrame = DefaultLeapFrame(schema, rows.collect().toSeq)
val leapFrameTransformed: DefaultLeapFrame = bundle.transform(leapFrame).get
val testSetTransformedMLeap: Seq[Row] = leapFrameTransformed.dataset
val columns = testSet.columns ++ Array("prediction")

// To transform the data obtained using MLeap back to a Spark Dataframe in order to compute metrics of the model (R2, MAE)
def mleapToSparkDF(mleapFrame: Seq[ml.combust.mleap.runtime.frame.Row], columns: Array[String]): DataFrame = {
  mleapFrame.map{
    mleapRow => (
        mleapRow.getString(0), //estado
        mleapRow.getString(1), //so
        mleapRow.getString(2), //cargador
        mleapRow.getString(3), // proc
        mleapRow.getString(4), // marca
        mleapRow.getString(5), // modelo
        mleapRow.getString(6), // tipo tg
        mleapRow.getString(7), // modelo tg
        mleapRow.getString(8), // tactil
        mleapRow.getDouble(9), // ssd
        mleapRow.getInt(10), // pulgadas
        mleapRow.getDouble(11), // hdd
        mleapRow.getDouble(12), //memoria ram
        mleapRow.getDouble(13), // capacidad tg
        mleapRow.getDouble(14), // precio original
        mleapRow.getLong(15), // idx
      //mleapRow.getAs[Tensor[Double]](index=33),
        mleapRow.getDouble(35) // precio prediction
      )
  }.toDF(columns:_*)
}
val testSetTransformed2 = mleapToSparkDF(testSetTransformedMLeap, columns)
val mae = buildEvaluator("mae").evaluate(testTransformed2)

And both the metrics and predicted values obtained with testSetTransformed and testSetTransformed2 are different:

scala> val mae = buildEvaluator("mae").evaluate(testSetTransformed)
mae: Double = 47.37160734311079 
scala> val mae = buildEvaluator("mae").evaluate(testSetTransformed2)
mae: Double = 538.1780686303168

Here you have a small sample of the test data, showing that the predictions are different:

+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+------------------+-----+
|ESTADO_PRODUCTO|SISTEMA_OPERATIVO|CARGADOR|   PROCESADOR|MARCA|     MODELO|TIPO_TARJETA_GRAFICA|MODELO_TARJETA_GRAFICA|PANTALLA_TACTIL|SSD_CAPACIDAD|PULGADAS|HDD_CAPACIDAD|MEMORIA_RAM|CAPACIDAD_TARJETA_GRAFICA|TOTAL_PRODUCTO_VENTA|        prediction|  idx|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+------------------+-----+
|              B|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        256.0|      13|          0.0|        8.0|                      0.0|              1099.0| 922.5587768554688| 4894|
|              B|       WINDOWS_10|      SI|INTEL_CORE_I5| ASUS|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|       1000.0|        8.0|                      0.0|               315.0|322.15277099609375| 2040|
|              C|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        128.0|      13|          0.0|        8.0|                      0.0|               765.0|   728.66357421875| 2927|
|              C|       WINDOWS_10|      SI|INTEL_CELERON| ACER|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|        500.0|        8.0|                      0.0|               215.0| 194.9210205078125|10422|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+------------------+-----+
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+-----------------+-----+
|ESTADO_PRODUCTO|SISTEMA_OPERATIVO|CARGADOR|   PROCESADOR|MARCA|     MODELO|TIPO_TARJETA_GRAFICA|MODELO_TARJETA_GRAFICA|PANTALLA_TACTIL|SSD_CAPACIDAD|PULGADAS|HDD_CAPACIDAD|MEMORIA_RAM|CAPACIDAD_TARJETA_GRAFICA|TOTAL_PRODUCTO_VENTA|       prediction|  idx|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+-----------------+-----+
|              B|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        256.0|      13|          0.0|        8.0|                      0.0|              1099.0|918.6906127929688| 4894|
|              B|       WINDOWS_10|      SI|INTEL_CORE_I5| ASUS|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|       1000.0|        8.0|                      0.0|               315.0|751.9229125976562| 2040|
|              C|           MAC_OS|      SI|INTEL_CORE_I5|APPLE|MACBOOK_PRO|          COMPARTIDA|                 OTROS|             NO|        128.0|      13|          0.0|        8.0|                      0.0|               765.0| 913.106201171875| 2927|
|              C|       WINDOWS_10|      SI|INTEL_CELERON| ACER|      OTROS|          COMPARTIDA|                 OTROS|             NO|          0.0|      15|        500.0|        8.0|                      0.0|               215.0| 838.892822265625|10422|
+---------------+-----------------+--------+-------------+-----+-----------+--------------------+----------------------+---------------+-------------+--------+-------------+-----------+-------------------------+--------------------+-----------------+-----+

Attached to this message, you may find

  • Training data set (mleaptraindata.csv)
  • Test set I used to generate the metrics showed above (mleaptestdata.csv)
  • Scala worksheet for pipeline generation (TestMlLeapSavingPipeline_prepared.sc)
  • Scala worksheet used to test the serialized pipeline (TestMlLeapReadingPipeline_prepared.sc)

I would very much appreciate any help you could give me.
Thanks a lot,
Irene
mleap_issue.zip

@talalryz
Copy link
Contributor

@irene3030
There was an issue with the way that mleap handles sparse rows when predicting with xgboost.
The pull request was recently merged a couple of days ago. (PR-205 )
Can you build master and check if this is still an issue?

@irene3030
Copy link
Author

irene3030 commented Jan 16, 2020

Hello again,

First of all: thank you very much for your input and for fixing this issue.

I did exactly what you told me: I downloaded master branch (0.16.0-SNAPSHOT) and built the whole project. It worked like a charm! I do not longer have the problem I had and the predictions are the same than the ones obtained using Spark.

I did have one issue FYI (just in case anyone bumps into this as well): I had to manually package some of the modules: mleap-xgboost-spark sbt mleap-xgboost-spark/package & mleap-xgboost-runtime sbt mleap-xgboost-runtime/package, since trying to package from root directory did not included those modules.

My colleagues and I are very grateful :)

@ancasarb
Copy link
Member

ancasarb commented Jan 16, 2020

great, thanks @talalryz for all your help!

is it ok to close this issue if that's alright, changes will be included in the next release.

@lucagiovagnoli
Copy link
Member

This is great, thanks @talalryz !

@talalryz
Copy link
Contributor

We, at Yelp, had been struggling with this bug ourselves so we're glad we could help others out along the way :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants