From 0a40d6d4e1ae64fa83b55f79d23fd193bd5d7af0 Mon Sep 17 00:00:00 2001 From: Jason Wang Date: Sat, 4 Jun 2022 10:53:27 -0700 Subject: [PATCH] feat: add support for SparkML KMeansModel conversion (#556) * feat: add support for SparkML KMeansModel conversion Signed-off-by: Jason Wang * lowering div/add operator to version 13 Signed-off-by: Jason Wang * fixing comments Signed-off-by: Jason Wang * removing default op_domain Signed-off-by: Jason Wang * fixing unit test breaks Signed-off-by: Jason Wang * requiring target opset to be >= 7 in order to use broadcast in add and div Signed-off-by: Jason Wang --- .../sparkml/operator_converters/__init__.py | 1 + .../sparkml/operator_converters/k_means.py | 180 ++++++++++++++++++ .../convert/sparkml/ops_input_output.py | 4 + onnxmltools/convert/sparkml/ops_names.py | 5 +- tests/sparkml/test_k_means.py | 79 ++++++++ 5 files changed, 268 insertions(+), 1 deletion(-) create mode 100644 onnxmltools/convert/sparkml/operator_converters/k_means.py create mode 100644 tests/sparkml/test_k_means.py diff --git a/onnxmltools/convert/sparkml/operator_converters/__init__.py b/onnxmltools/convert/sparkml/operator_converters/__init__.py index 0c1df0e95..ab8b67294 100644 --- a/onnxmltools/convert/sparkml/operator_converters/__init__.py +++ b/onnxmltools/convert/sparkml/operator_converters/__init__.py @@ -32,4 +32,5 @@ from . import linear_classifier from . import onehot_encoder from . import vector_assembler +from . import k_means diff --git a/onnxmltools/convert/sparkml/operator_converters/k_means.py b/onnxmltools/convert/sparkml/operator_converters/k_means.py new file mode 100644 index 000000000..45909394e --- /dev/null +++ b/onnxmltools/convert/sparkml/operator_converters/k_means.py @@ -0,0 +1,180 @@ +from ...common._registration import register_converter, register_shape_calculator +from ...common.data_types import Int64TensorType, FloatTensorType +from ...common.utils import check_input_and_output_numbers, check_input_and_output_types +from ...common._topology import Operator, Scope, ModelComponentContainer +from ....proto import onnx_proto +from pyspark.ml.clustering import KMeansModel +from typing import List +import numpy as np + +def convert_sparkml_k_means_model(scope: Scope, operator: Operator, container: ModelComponentContainer): + if container.target_opset < 7: + raise NotImplementedError("Converting to ONNX for KMeansModel is not supported in opset < 7") + + op: KMeansModel = operator.raw_operator + centers: np.ndarray = np.vstack(op.clusterCenters()) + + K = centers.shape[0] # number of clusters + C = operator.inputs[0].type.shape[1] # Number of features from input + + if centers.shape[1] != C: + raise ValueError(f"Number of features {centers.shape[1]} in input does not match number of features in centers {C}") + + # [K x C] + centers_variable_name = scope.get_unique_variable_name("centers") + container.add_initializer( + centers_variable_name, + onnx_proto.TensorProto.FLOAT, + centers.shape, + centers.flatten().astype(np.float32) + ) + + distance_output_variable_name = scope.get_unique_variable_name("distance_output") + + if op.getDistanceMeasure() == "euclidean": + # [1 x K] + centers_row_squared_sum_variable_name = scope.get_unique_variable_name("centers_row_squared_sum") + centers_row_squared_sum = np.sum(centers**2,axis=-1).flatten().astype(np.float32) + container.add_initializer( + centers_row_squared_sum_variable_name, + onnx_proto.TensorProto.FLOAT, + [1, K], + centers_row_squared_sum + ) + + # input_row_squared_sum: [N x 1] + input_row_squared_sum_variable_name = scope.get_unique_variable_name("input_row_squared_sum") + reduce_sum_square_attrs = { + "name": scope.get_unique_operator_name("input_row_squared_sum"), + "axes": [1], + "keepdims": 1, + } + container.add_node( + op_type="ReduceSumSquare", + inputs=[operator.inputs[0].full_name], + outputs=[input_row_squared_sum_variable_name], + **reduce_sum_square_attrs + ) + + # -2 * input * Transpose(Center) + input_row_squared_sum: [N x K] + gemm_output_variable_name = scope.get_unique_variable_name("gemm_output") + gemm_attrs = { + "name": scope.get_unique_operator_name("GeMM"), + "alpha": -2.0, + "beta": 1.0, + "transB": 1, + } + container.add_node( + op_type="Gemm", + inputs=[operator.inputs[0].full_name, centers_variable_name, input_row_squared_sum_variable_name], + outputs=[gemm_output_variable_name], + op_version=7, + **gemm_attrs + ) + + # Euclidean Distance Squared = input_row_squared_sum - 2 * input * Transpose(Center) + Transpose(centers_row_squared_sum) + # [N x K] + container.add_node( + op_type="Add", + inputs=[gemm_output_variable_name, centers_row_squared_sum_variable_name], + outputs=[distance_output_variable_name], + op_version=7, + ) + elif op.getDistanceMeasure() == "cosine": + # centers_row_norm2: [1 x K] + centers_row_norm2_variable_name = scope.get_unique_variable_name("centers_row_norm2") + centers_row_norm2 = np.linalg.norm(centers, ord = 2, axis=1).flatten().astype(np.float32) + container.add_initializer( + centers_row_norm2_variable_name, + onnx_proto.TensorProto.FLOAT, + [1, K], + centers_row_norm2 + ) + + # input_row_norm2: [N x 1] + input_row_norm2_variable_name = scope.get_unique_variable_name("input_row_norm2") + reduce_l2_attrs = { + "name": scope.get_unique_operator_name("input_row_norm2"), + "axes": [1], + "keepdims": 1, + } + container.add_node( + op_type="ReduceL2", + inputs=[operator.inputs[0].full_name], + outputs=[input_row_norm2_variable_name], + **reduce_l2_attrs + ) + + # input * Transpose(Center): [N x K] + zeros_variable_name = scope.get_unique_variable_name("zeros") + container.add_initializer( + zeros_variable_name, + onnx_proto.TensorProto.FLOAT, + [1, K], + np.zeros([1, K]).flatten().astype(np.float32) + ) + gemm_output_variable_name = scope.get_unique_variable_name("gemm_output") + gemm_attrs = { + "name": scope.get_unique_operator_name("GeMM"), + "alpha": 1.0, + "transB": 1, + } + container.add_node( + op_type="Gemm", + inputs=[operator.inputs[0].full_name, centers_variable_name, zeros_variable_name], + outputs=[gemm_output_variable_name], + op_version=7, + **gemm_attrs + ) + + # Cosine similarity = gemm_output / input_row_norm2 / centers_row_norm2: [N x K] + div_output_variable_name = scope.get_unique_variable_name("div_output") + container.add_node( + op_type="Div", + inputs=[gemm_output_variable_name, input_row_norm2_variable_name], + outputs=[div_output_variable_name], + op_version=7, + ) + cosine_similarity_output_variable_name = scope.get_unique_variable_name("cosine_similarity_output") + container.add_node( + op_type="Div", + inputs=[div_output_variable_name, centers_row_norm2_variable_name], + outputs=[cosine_similarity_output_variable_name], + op_version=7, + ) + + # Cosine distance - 1 = -Cosine similarity: [N x K] + container.add_node( + op_type="Neg", + inputs=[cosine_similarity_output_variable_name], + outputs=[distance_output_variable_name], + ) + else: + raise ValueError(f"Distance measure {op.getDistanceMeasure()} not supported") + + # ArgMin(distance): [N] + argmin_attrs = { + "axis": 1, + "keepdims": 0, + } + container.add_node( + op_type="ArgMin", + inputs=[distance_output_variable_name], + outputs=[operator.outputs[0].full_name], + **argmin_attrs + ) + +register_converter('pyspark.ml.clustering.KMeansModel', convert_sparkml_k_means_model) + + +def calculate_k_means_model_output_shapes(operator: Operator): + check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1) + check_input_and_output_types(operator, good_input_types=[FloatTensorType]) + if len(operator.inputs[0].type.shape) != 2: + raise RuntimeError('Input must be a [N, C]-tensor') + + N = operator.inputs[0].type.shape[0] + operator.outputs[0].type = Int64TensorType(shape=[N]) + + +register_shape_calculator('pyspark.ml.clustering.KMeansModel', calculate_k_means_model_output_shapes) diff --git a/onnxmltools/convert/sparkml/ops_input_output.py b/onnxmltools/convert/sparkml/ops_input_output.py index 8287da298..7837d203b 100644 --- a/onnxmltools/convert/sparkml/ops_input_output.py +++ b/onnxmltools/convert/sparkml/ops_input_output.py @@ -164,6 +164,10 @@ def build_io_name_map(): "pyspark.ml.feature.VectorAssembler": ( lambda model: model.getOrDefault("inputCols"), lambda model: [model.getOrDefault("outputCol")] + ), + "pyspark.ml.clustering.KMeansModel": ( + lambda model: [model.getOrDefault("featuresCol")], + lambda model: [model.getOrDefault("predictionCol")] ) } return map diff --git a/onnxmltools/convert/sparkml/ops_names.py b/onnxmltools/convert/sparkml/ops_names.py index 2ef918e02..645501a39 100644 --- a/onnxmltools/convert/sparkml/ops_names.py +++ b/onnxmltools/convert/sparkml/ops_names.py @@ -48,7 +48,7 @@ from pyspark.ml.regression import LinearRegressionModel from pyspark.ml.clustering import BisectingKMeans -from pyspark.ml.clustering import KMeans +from pyspark.ml.clustering import KMeansModel from pyspark.ml.clustering import GaussianMixture from pyspark.ml.clustering import LDA @@ -70,6 +70,9 @@ def build_sparkml_operator_name_map(): AFTSurvivalRegressionModel, DecisionTreeRegressionModel, GBTRegressionModel, GBTRegressionModel, GeneralizedLinearRegressionModel, IsotonicRegressionModel, LinearRegressionModel, RandomForestRegressionModel ]}) + res.update({k: "pyspark.ml.clustering." + k.__name__ for k in [ + KMeansModel + ]}) return res diff --git a/tests/sparkml/test_k_means.py b/tests/sparkml/test_k_means.py new file mode 100644 index 000000000..c03ce40d5 --- /dev/null +++ b/tests/sparkml/test_k_means.py @@ -0,0 +1,79 @@ +# SPDX-License-Identifier: Apache-2.0 + +import sys +import unittest +import numpy +import pandas +from pyspark.ml.clustering import KMeans +from pyspark.ml.linalg import Vectors +from onnx.defs import onnx_opset_version +from onnxconverter_common.onnx_ex import DEFAULT_OPSET_NUMBER +from pyspark.ml import Pipeline +from onnxmltools import convert_sparkml +from onnxmltools.convert.common.data_types import FloatTensorType +from tests.sparkml.sparkml_test_utils import save_data_models, run_onnx_model, compare_results +from tests.sparkml import SparkMlTestCase + + +TARGET_OPSET = min(DEFAULT_OPSET_NUMBER, onnx_opset_version()) + + +class TestSparkmlKMeansModel(SparkMlTestCase): + + @unittest.skipIf(sys.version_info < (3, 8), + reason="pickle fails on python 3.7") + def test_model_k_means_euclidean(self): + """ + Testing ONNX conversion for Spark KMeansModel when distanceMeasure is set to "euclidean". + """ + kmeans_euclidean = KMeans(k=3, distanceMeasure="euclidean", featuresCol="features_euclidean", predictionCol="prediction_euclidean") + kmeans_cosine = KMeans(k=3, distanceMeasure="cosine", featuresCol="features_cosine", predictionCol="prediction_cosine") + + data = self.spark.createDataFrame([ + (0, Vectors.dense([1.0, 3.1, -1.0]),Vectors.dense([1.0, 1.0, 1.0]),), + (1, Vectors.dense([1.1, 3.0, -1.1]),Vectors.dense([2.0, 2.0, 2.0]),), + (2, Vectors.dense([-3.0, 5.1, 9.0]),Vectors.dense([-1.0, 3.0, -5.0]),), + (3, Vectors.dense([-2.9, 4.9, 8.9]),Vectors.dense([-2.0, 6.0, -10.0]),), + (4, Vectors.dense([5.0, -3.5, 2.0]),Vectors.dense([1.0, -2.0, 4.0]),), + (5, Vectors.dense([5.1, -3.3, 2.1]),Vectors.dense([2.0, -4.0, 8.0]),), + ], ["id", "features_euclidean", "features_cosine"]) + + model = Pipeline(stages=[kmeans_euclidean, kmeans_cosine]).fit(data) + model_onnx = convert_sparkml( + model, + 'Sparkml KMeansModel', + [('features_euclidean', FloatTensorType([None, 3])), ('features_cosine', FloatTensorType([None, 3]))], + target_opset=TARGET_OPSET + ) + + self.assertTrue(model_onnx is not None) + self.assertTrue(model_onnx.graph.node is not None) + + # run the model + predicted = model.transform(data).toPandas() + + data_pd = data.toPandas() + data_np = { + "features_euclidean": data_pd.features_euclidean.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32), + "features_cosine": data_pd.features_cosine.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32), + } + + expected = { + "prediction_euclidean": numpy.asarray(predicted.prediction_euclidean.values), + "prediction_cosine": numpy.asarray(predicted.prediction_cosine.values), + } + + paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlKMeansModel") + onnx_model_path = paths[-1] + + output_names = ['prediction_euclidean', 'prediction_cosine'] + output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path) + actual_output = dict(zip(output_names, output)) + + assert output_shapes[0] == [None] + assert output_shapes[1] == [None] + compare_results(expected["prediction_euclidean"], actual_output["prediction_euclidean"], decimal=5) + compare_results(expected["prediction_cosine"], actual_output["prediction_cosine"], decimal=5) + +if __name__ == "__main__": + unittest.main()