Welcome! flink-jpmml
is a fresh-made library for Real-Time machine learning predictions built on top of
PMML standard models and Apache Flink
streaming engine.
flink-jpmml
is ease to use, running at serious scale, backend independent and naturally shaped to streaming
scenario.
In order to getting started, you only need
- any well-known version of a PMML model (3.2 or above)
- flink-jpmml is tested with the latest Flink (i.e. 1.3.1), but any working Apache Flink version (repo) should work properly.
-
if you employ sbt add the following dependecy to your project:
- Snapshot:
"io.radicalbit" %% "flink-jpmml-scala" % "0.6.0-SNAPSHOT"
- Stable:
"io.radicalbit" %% "flink-jpmml-scala" % "0.5.1"
- Snapshot:
-
For maven users instead:
- Snapshot
<dependencies> <dependency> <groupId>io.radicalbit</groupId> <artifactId>flink-jpmml-scala</artifactId> <version>0.6.0-SNAPSHOT</version> </dependency> </dependencies>
- Stable:
<dependencies> <dependency> <groupId>io.radicalbit</groupId> <artifactId>flink-jpmml-scala</artifactId> <version>0.5.1</version> </dependency> </dependencies>
Eventually, you can publish flink-jpmml
on your local repository. Then
- execute within the flink-jpmml root
> sbt
- select flink-jpmml-scala project
> project flink-jpmml-scala
- publish the library to your local repo
> publishLocal
Keep in mind you will need also Flink scala-core
flink-streaming
and flink-clients
libraries.
Lets start.
flink-jpmml
enables Flink users to execute real-time predictions based on machine learning models trained by any
system supporting the PMML
standard; this allows efficient streaming model serving along with the powerful Flink engine features.
Lets start with a concrete example.
Supposing you have your focused InputStream
and you want to score related data
import org.apache.flink.streaming.api.scala._
case class InputEvent(data: Array)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events: DataStream[InputEvent] = env.yourInputStream
So you can achieve it easily with the following
// This will be all that you need
import io.radicalbit.flink.pmml.scala._
import org.apache.flink.ml.math.Vector
import org.apache.flink.streaming.api.scala._
object FlinkJpmmlExample {
def main(args: Array[String]): Unit = {
// your model can reside in any Flink supported backend
val pathToPmml = "/even/to/distributed/systems"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val events = env.yourInputStream
// a lazy reader implementation
val reader = ModelReader(pathToPmml)
// lets go with predictions
events.evaluate(reader) { (event, model) =>
// FlinkML Vector abstraction is used
val toBePredicted = vectorize(event)
// and here we are
val prediction: Prediction = model.predict(toBePredicted)
val result = prediction.value
// finally custom returns
(event, result)
}
env.execute("Awesome predictions with flink-jpmml")
}
}
Some useful insights from the code:
- in order to load the PMML model, you need to specify only the PMML source path
ModelReader
is a lazy reader and it provides the right reading abstraction to TaskManagers- The resulting
PMMLModel
will be loaded by once factory for each TaskManager running on your architecture at construction time - the
PmmlModel.predict
method expects Flink Vectors as input event and, if you want to manage NaNs, an optional replace value; Prediction
provides the result of the input event evaluation against the PMML model as aPrediction[Double]
ADT, so if the model can't manage a prediction it will return aEmptyPrediction
value.
flink-jpmml
provides also a quick prediction function if it can run against a Stream of Flink Vectors
...
val vectorStream: DataStream[Vector] = events.map(event => vectorize(event))
val predictions: DataStream[Prediction, Vector] = vectorStream.evaluate(reader)
predictions.map(_.target).print()
env.execute("flink-jpmml quick predictions")
These really basic examples show you how to interact with the library. The following sections try to address some interesting details which worth a deeper analysis.
flink-jpmml
main effort is to retain all the streaming concepts:
- since Flink is able to run against several distributed backend, users need to specify only the PMML source path: the library will take care how to load the model in full compliance of the underlying distributed system (e.g. HDFS, Alluxio)
ModelReader
is the object implementing the previous behavior; it will provide the loading methods but will read it lazily, i.e. only when the transformation will be applied- The
PMMLModel
will be loaded by a singleton model factory for each TaskManager running on your architecture; that means if you have an active TaskManager A made up of 4 TaskSlots, your TM will load the model from a single loader entity; this is crucial in order to let the system scale in thread-safety (still simple PMML models can grow to several hundreds of MegaBytes proportionally to the model size, meaning a big load in memory terms) - the
PmmlModel.predict
method expects Flink Vectors as input events; this choice let us to leverage the underlying Breeze implementation and no reflection will be applied at all; moreover, the user don't have to specify any key-value structure: you have data matching a feature vector, so the former will be used against the latter; (see input discussion section for further details) flink-jpmml
can also handle sparse data, thus you can just pass the desired replace value as argument to the discussed method (here you will need a SparseVector)val prediction = model.predict(sparseData, replaceValue)
Prediction
is the output case class: it returns the result of the input event evaluation against the PMML model as aScore[Double]
ADT, so if the model can't manage a prediction it will return aEmptyScore
.
The design worths bit more focus: the choice to have a UDF as input prediction method is justified by the need of handling a Machine Learning task (a prediction task) along with a pure Streaming application; in this way the user can manage predictions in the body of the function with the primitive event.
Assume this is the considered PMML feature vector
["sepal_width", "sepal_length", "petal_width", "petal_length"]
and you have values for all these fields; so, just create a DenseVector
val vector: DenseVector = DenseVector(value1, value2, value3, value4)
Suppose you missed value2, so you will need a SparseVector
val vector: SparseVector = SparseVector(4, Array(0, 2, 3), Array(value1, value2, value4))
flink-jpmml
will recognize missing values and it will replace them with replaceValue
if specified
(as second argument of the PmmlModel.predict
method), otherwise the NaNs handling is demanded to the PMML model.
Note also flink-jpmml
assumes that if you employ a DenseVector, it means that the dense instance size is your model
size and it will not predict anything (i.e. returns Score.Empty
); in case of sparse instances,
the library reads the sparse size value.
flink-jpmml
won't break your job if something goes wrong but the model loading step; it is such a crucial
action (it's mandatory for predictions), then in case of failure it raises a ModelLoadingException
.
Each other issue is detailed as log messages.
The handled failures are:
- Input Validation failure - The input is not corresponding to the feature vector in size terms (either too big or too short)
- Input Preparation failure - JPMML library fails to prepare internal data format
- Evaluation step failure - JPMML fails to evaluate the input against the PMML
- JPMML Result Extraction failure - The job fails to retrieve the result from the PMML model
If you want to contribute to the project send an email to info@radicalbit.io or just open an issue
here. flink-jpmml
community is looking for you!
- Andrea Spina - andrea.spina@radicalbit.io @spi-x-i
- Francesco Frontera - francesco.frontera@radicalbit.io @francescofrontera
- Simone Robutti - Initial prototype simone.robutti@gmail.com @chobeat
- Stefano Baghino - Initial prototype @stefanobaghino
- Apache®, Apache Flink™, Flink™, and the Apache feather logo are trademarks of The Apache Software Foundation.
- PMML standard is a trademark of The Data Mining Group. All rights reserved.
- JPMML-Evaluator is licensed under the GNU Affero General Public License (AGPL) version 3.0. Other licenses are available on request.
This library has been published under the GNU Affero General Public License (AGPL) version 3.0 following and respecting the official advices coming from the Apache Software Foundation about the compatibility between the Apache License, Version 2.0 and the GNU General Public License, Version 3.0
THIS STANDARD IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS RELEASE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.