Skip to content

Serializing a Spark ML Pipeline and Scoring with MLeap

Mikhail Semeniuk edited this page Dec 27, 2016 · 7 revisions

This tutorial shows you how to use MLeap and Bundle.ML components to export a trained Spark ML Pipeline and use MLeap to transform new data without any dependency on Spark.

We will construct an ML Pipeline comprised of a Vector Assembler, a Binarizer, PCA and a Random Forest Model for handwritten image classification on the MNIST dataset.

The code for this tutorial is split up into two parts to demonstrate MLeap's independence on Spark:

  • Spark ML Pipeline Code: Vanilla/out-of-the-box Spark code to train the ML Pipeline, which we serialize to Bundle.ML
  • MLeap Code: Load the serialized Bundle to Mleap and transform Leap Frames

Before we begin, let's cover some terms:

Nouns

  • Estimator: The actual learning algorithms that train/fit the transformer against the data frame and produces a Model
  • Model: In Spark, the model is the code and metadata needed to score against an already trained algorithm
  • Transformer: Anything that transforms a data frame, does not necessarily be trained by an estimator (i.e. a Binarizer)
  • LeapFrame: A dataframe structure used for storing your data and the associated schema

Train a Spark Pipeline

Load the data

// Note that we are taking advantage of com.databricks:spark-csv package to load the data
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,IndexToString, Binarizer}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator}
import org.apache.spark.ml.{Pipeline,PipelineModel}  
import org.apache.spark.ml.feature.PCA

val datasetPath = "./mleap-demo/data/mnist/mnist_train.csv"
var dataset = spark.sqlContext.read.format("com.databricks.spark.csv").
                 option("header", "true").
                 option("inferSchema", "true").
                 load(datasetPath)
                 
val testDatasetPath = "./mleap-demo/data/mnist/mnist_test.csv"
var test = spark.sqlContext.read.format("com.databricks.spark.csv").
                 option("inferSchema", "true").
                 option("header", "true").
                 load(testDatasetPath)

Build the ML Data Pipeline

// Define Dependent and Independent Features
val predictionCol = "label"
val labels = Seq("0","1","2","3","4","5","6","7","8","9")  
val pixelFeatures = (0 until 784).map(x => s"x$x").toArray

val layers = Array[Int](pixelFeatures.length, 784, 800, labels.length)

val vector_assembler = new VectorAssembler()  
  .setInputCols(featureColumns)
  .setOutputCol("features")

val stringIndexer = { new StringIndexer()  
  .setInputCol(predictionCol)
  .setOutputCol("label_index")
  .fit(dataset)
}
  
val binarizer = new Binarizer()  
  .setInputCol(vector_assembler.getOutputCol)
  .setThreshold(127.5)
  .setOutputCol("binarized_features")
  
val pca = new PCA().
  setInputCol(binarizer.getOutputCol).
  setOutputCol("pcaFeatures").
  setK(10)

val featurePipeline = new Pipeline().setStages(Array(vector_assembler, stringIndexer, binarizer, pca))

// Transform the raw data with the feature pipeline and persist it
val featureModel = featurePipeline.fit(dataset)

val datasetWithFeatures = featureModel.transform(dataset)

// Select only the data needed for training and persist it
val datasetPcaFeaturesOnly = datasetWithFeatures.select(stringIndexer.getOutputCol, pca.getOutputCol)
val datasetPcaFeaturesOnlyPersisted = datasetPcaFeaturesOnly.persist()

We could make the random forest model be part of the same pipeline, however, there is an existing bug (SPARK-16845] that prevents us from doing that.

Train a Random Forest Model

// You can optionally experiment with CrossValidator and MulticlassClassificationEvaluator to determine optimal
// settings for the random forest

val rf = new RandomForestClassifier().
      setFeaturesCol(pca.getOutputCol).
      setLabelCol(stringIndexer.getOutputCol).
      setPredictionCol("prediction").
      setProbabilityCol("probability").
      setRawPredictionCol("raw_prediction")

val rfModel = rf.fit(datasetPcaFeaturesOnlyPersisted)

Serialize the ML Data Pipeline and RF Model to Bundle.ML

import org.apache.spark.ml.mleap.SparkUtil

val pipeline = SparkUtil.createPipelineModel(uid = "pipeline", Array(featurePipeline, rf))

import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._

val modelFile = BundleFile("/tmp/mnist-spark-pipeline.zip")
pipeline.write.
  overwrite.
  name("simple-pipeline").
  save(dest)

modelFile.close()

Deserialize to MLeap and Score New Data

The goal of this step is to show how to deserialize a bundle and use it to score LeapFrames without any Spark dependencies.

import ml.combust.bundle.BundleFile
import ml.combust.mleap.MleapSupport._

// load the Spark pipeline we saved in the previous section
val bundle = BundleFile("/tmp/mnist-spark-pipeline.zip").load().get

Load the sample LeapFrame from the mleap-demo git repo (data/mnist.json)

import ml.combust.mleap.runtime.serialization.FrameReader

val s = scala.io.Source.fromURL("file:///./mleap-demo/mnist.json").mkString

val bytes = s.getBytes("UTF-8")
val frame = FrameReader("ml.combust.mleap.json").fromBytes(bytes)

// transform the dataframe using our pipeline
val mleapPipeline = bundle.root
val frame2 = mleapPipeline.transform(frame).get
val data = frame2.dataset

What next? You can find more examples and notebooks here.