From 58be3ea4abb273a9ed2063db7dc60011c6baf154 Mon Sep 17 00:00:00 2001 From: jiamingy Date: Fri, 16 Dec 2022 23:34:38 +0800 Subject: [PATCH] Refactor PySpark tests. - Convert classifier tests to pytest tests. - Replace hardcoded tests. --- .../test_with_spark/test_spark_local.py | 492 +++++++++--------- 1 file changed, 254 insertions(+), 238 deletions(-) diff --git a/tests/test_distributed/test_with_spark/test_spark_local.py b/tests/test_distributed/test_with_spark/test_spark_local.py index 6754bacc6a99..fa7bdd94fa78 100644 --- a/tests/test_distributed/test_with_spark/test_spark_local.py +++ b/tests/test_distributed/test_with_spark/test_spark_local.py @@ -1,9 +1,10 @@ import glob import logging import random +import tempfile import uuid from collections import namedtuple -from typing import Generator +from typing import Generator, Sequence, Type import numpy as np import pytest @@ -248,6 +249,87 @@ def clf_with_weight( ) +ClfData = namedtuple( + "ClfData", ("cls_params", "cls_df_train", "cls_df_train_large", "cls_df_test") +) + + +@pytest.fixture +def clf_data(spark: SparkSession) -> Generator[ClfData, None, None]: + cls_params = {"max_depth": 5, "n_estimators": 10, "scale_pos_weight": 4} + + X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]]) + y = np.array([0, 1]) + cl1 = xgb.XGBClassifier() + cl1.fit(X, y) + predt0 = cl1.predict(X) + proba0: np.ndarray = cl1.predict_proba(X) + cl2 = xgb.XGBClassifier(max_depth=5, n_estimators=10, scale_pos_weight=4) + cl2.fit(X, y) + predt1 = cl2.predict(X) + proba1: np.ndarray = cl2.predict_proba(X) + + # convert np array to pyspark dataframe + cls_df_train_data = [ + (Vectors.dense(X[0, :]), int(y[0])), + (Vectors.sparse(3, {1: float(X[1, 1]), 2: float(X[1, 2])}), int(y[1])), + ] + cls_df_train = spark.createDataFrame(cls_df_train_data, ["features", "label"]) + + cls_df_train_large = spark.createDataFrame( + cls_df_train_data * 100, ["features", "label"] + ) + + cls_df_test = spark.createDataFrame( + [ + ( + Vectors.dense(X[0, :]), + int(predt0[0]), + proba0[0, :].tolist(), + int(predt1[0]), + proba1[0, :].tolist(), + ), + ( + Vectors.sparse(3, {1: 1.0, 2: 5.5}), + int(predt0[1]), + proba0[1, :].tolist(), + int(predt1[1]), + proba1[1, :].tolist(), + ), + ], + [ + "features", + "expected_prediction", + "expected_probability", + "expected_prediction_with_params", + "expected_probability_with_params", + ], + ) + yield ClfData(cls_params, cls_df_train, cls_df_train_large, cls_df_test) + + +def assert_model_compatible(model: XGBModel, model_path: str) -> None: + bst = xgb.Booster() + path = glob.glob(f"{model_path}/**/model/part-00000", recursive=True)[0] + bst.load_model(path) + np.testing.assert_equal( + np.array(model.get_booster().save_raw("json")), np.array(bst.save_raw("json")) + ) + + +def check_sub_dict_match( + sub_dist: dict, whole_dict: dict, excluding_keys: Sequence[str] +) -> None: + for k in sub_dist: + if k not in excluding_keys: + assert k in whole_dict, f"check on {k} failed" + assert sub_dist[k] == whole_dict[k], f"check on {k} failed" + + +def get_params_map(params_kv: dict, estimator: Type) -> dict: + return {getattr(estimator, k): v for k, v in params_kv.items()} + + class TestPySparkLocal: def test_regressor_with_weight_eval(self, reg_with_weight: RegWithWeight) -> None: # with weight @@ -350,10 +432,161 @@ def test_classifier_with_weight_eval(self, clf_with_weight: ClfWithWeight) -> No ) for row in pred_result_with_weight_eval: - np.testing.assert_allclose( # failed + np.testing.assert_allclose( row.probability, row.expected_prob_with_weight_and_eval, atol=1e-3 ) + def test_classifier_model_save_load(self, clf_data: ClfData) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + path = "file:" + tmpdir + clf = SparkXGBClassifier(**clf_data.cls_params) + model = clf.fit(clf_data.cls_df_train) + model.save(path) + loaded_model = SparkXGBClassifierModel.load(path) + assert model.uid == loaded_model.uid + for k, v in clf_data.cls_params.items(): + assert loaded_model.getOrDefault(k) == v + + pred_result = loaded_model.transform(clf_data.cls_df_test).collect() + for row in pred_result: + np.testing.assert_allclose( + row.probability, row.expected_probability_with_params, atol=1e-3 + ) + + with pytest.raises(AssertionError, match="Expected class name"): + SparkXGBRegressorModel.load(path) + + assert_model_compatible(model, tmpdir) + + def test_classifier_basic(self, clf_data: ClfData) -> None: + classifier = SparkXGBClassifier() + model = classifier.fit(clf_data.cls_df_train) + pred_result = model.transform(clf_data.cls_df_test).collect() + for row in pred_result: + np.testing.assert_equal(row.prediction, row.expected_prediction) + np.testing.assert_allclose( + row.probability, row.expected_probability, rtol=1e-3 + ) + + def test_classifier_with_params(self, clf_data: ClfData) -> None: + classifier = SparkXGBClassifier(**clf_data.cls_params) + all_params = dict( + **(classifier._gen_xgb_params_dict()), + **(classifier._gen_fit_params_dict()), + **(classifier._gen_predict_params_dict()), + ) + check_sub_dict_match( + clf_data.cls_params, all_params, excluding_keys=_non_booster_params + ) + + model = classifier.fit(clf_data.cls_df_train) + all_params = dict( + **(model._gen_xgb_params_dict()), + **(model._gen_fit_params_dict()), + **(model._gen_predict_params_dict()), + ) + check_sub_dict_match( + clf_data.cls_params, all_params, excluding_keys=_non_booster_params + ) + pred_result = model.transform(clf_data.cls_df_test).collect() + for row in pred_result: + np.testing.assert_equal(row.prediction, row.expected_prediction_with_params) + np.testing.assert_allclose( + row.probability, row.expected_probability_with_params, rtol=1e-3 + ) + + def test_classifier_model_pipeline_save_load(self, clf_data: ClfData) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + path = "file:" + tmpdir + classifier = SparkXGBClassifier() + pipeline = Pipeline(stages=[classifier]) + pipeline = pipeline.copy( + extra=get_params_map(clf_data.cls_params, classifier) + ) + model = pipeline.fit(clf_data.cls_df_train) + model.save(path) + + loaded_model = PipelineModel.load(path) + for k, v in clf_data.cls_params.items(): + assert loaded_model.stages[0].getOrDefault(k) == v + + pred_result = loaded_model.transform(clf_data.cls_df_test).collect() + for row in pred_result: + np.testing.assert_allclose( + row.probability, row.expected_probability_with_params, atol=1e-3 + ) + assert_model_compatible(model.stages[0], tmpdir) + + def test_classifier_with_cross_validator(self, clf_data: ClfData) -> None: + xgb_classifer = SparkXGBClassifier(n_estimators=1) + paramMaps = ParamGridBuilder().addGrid(xgb_classifer.max_depth, [1, 2]).build() + cvBin = CrossValidator( + estimator=xgb_classifer, + estimatorParamMaps=paramMaps, + evaluator=BinaryClassificationEvaluator(), + seed=1, + parallelism=4, + numFolds=2, + ) + cvBinModel = cvBin.fit(clf_data.cls_df_train_large) + cvBinModel.transform(clf_data.cls_df_test) + + def test_convert_to_sklearn_model_clf(self, clf_data: ClfData) -> None: + classifier = SparkXGBClassifier( + n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5 + ) + clf_model = classifier.fit(clf_data.cls_df_train) + + # Check that regardless of what booster, _convert_to_model converts to the + # correct class type + sklearn_classifier = classifier._convert_to_sklearn_model( + clf_model.get_booster().save_raw("json"), + clf_model.get_booster().save_config(), + ) + assert isinstance(sklearn_classifier, XGBClassifier) + assert sklearn_classifier.n_estimators == 200 + assert sklearn_classifier.missing == 2.0 + assert sklearn_classifier.max_depth == 3 + assert sklearn_classifier.get_params()["sketch_eps"] == 0.5 + + def test_classifier_array_col_as_feature(self, clf_data: ClfData) -> None: + train_dataset = clf_data.cls_df_train.withColumn( + "features", vector_to_array(spark_sql_func.col("features")) + ) + test_dataset = clf_data.cls_df_test.withColumn( + "features", vector_to_array(spark_sql_func.col("features")) + ) + classifier = SparkXGBClassifier() + model = classifier.fit(train_dataset) + + pred_result = model.transform(test_dataset).collect() + for row in pred_result: + np.testing.assert_equal(row.prediction, row.expected_prediction) + np.testing.assert_allclose( + row.probability, row.expected_probability, rtol=1e-3 + ) + + def test_classifier_with_feature_names_types_weights( + self, clf_data: ClfData + ) -> None: + classifier = SparkXGBClassifier( + feature_names=["a1", "a2", "a3"], + feature_types=["i", "int", "float"], + feature_weights=[2.0, 5.0, 3.0], + ) + model = classifier.fit(clf_data.cls_df_train) + model.transform(clf_data.cls_df_test).collect() + + def test_early_stop_param_validation(self, clf_data: ClfData) -> None: + classifier = SparkXGBClassifier(early_stopping_rounds=1) + with pytest.raises(ValueError, match="early_stopping_rounds"): + classifier.fit(clf_data.cls_df_train) + + def test_gpu_param_setting(self, clf_data: ClfData) -> None: + py_cls = SparkXGBClassifier(use_gpu=True) + train_params = py_cls._get_distributed_train_params(clf_data.cls_df_train) + assert train_params["tree_method"] == "gpu_hist" + class XgboostLocalTest(SparkTestCase): def setUp(self): @@ -406,60 +639,6 @@ def setUp(self): ], ) - # >>> X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]]) - # >>> y = np.array([0, 1]) - # >>> cl1 = xgboost.XGBClassifier() - # >>> cl1.fit(X, y) - # >>> cl1.predict(X) - # array([0, 0]) - # >>> cl1.predict_proba(X) - # array([[0.5, 0.5], - # [0.5, 0.5]], dtype=float32) - # >>> cl2 = xgboost.XGBClassifier(max_depth=5, n_estimators=10, scale_pos_weight=4) - # >>> cl2.fit(X, y) - # >>> cl2.predict(X) - # array([1, 1]) - # >>> cl2.predict_proba(X) - # array([[0.27574146, 0.72425854 ], - # [0.27574146, 0.72425854 ]], dtype=float32) - self.cls_params = {"max_depth": 5, "n_estimators": 10, "scale_pos_weight": 4} - - cls_df_train_data = [ - (Vectors.dense(1.0, 2.0, 3.0), 0), - (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1), - ] - self.cls_df_train = self.session.createDataFrame( - cls_df_train_data, ["features", "label"] - ) - self.cls_df_train_large = self.session.createDataFrame( - cls_df_train_data * 100, ["features", "label"] - ) - self.cls_df_test = self.session.createDataFrame( - [ - ( - Vectors.dense(1.0, 2.0, 3.0), - 0, - [0.5, 0.5], - 1, - [0.27574146, 0.72425854], - ), - ( - Vectors.sparse(3, {1: 1.0, 2: 5.5}), - 0, - [0.5, 0.5], - 1, - [0.27574146, 0.72425854], - ), - ], - [ - "features", - "expected_prediction", - "expected_probability", - "expected_prediction_with_params", - "expected_probability_with_params", - ], - ) - # kwargs test (using the above data, train, we get the same results) self.cls_params_kwargs = {"tree_method": "approx", "sketch_eps": 0.03} @@ -610,6 +789,22 @@ def assert_model_compatible(self, model: XGBModel, model_path: str): bst.load_model(path) self.assertEqual(model.get_booster().save_raw("json"), bst.save_raw("json")) + def test_convert_to_sklearn_model_reg(self) -> None: + regressor = SparkXGBRegressor( + n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5 + ) + reg_model = regressor.fit(self.reg_df_train) + + sklearn_regressor = regressor._convert_to_sklearn_model( + reg_model.get_booster().save_raw("json"), + reg_model.get_booster().save_config(), + ) + assert isinstance(sklearn_regressor, XGBRegressor) + assert sklearn_regressor.n_estimators == 200 + assert sklearn_regressor.missing == 2.0 + assert sklearn_regressor.max_depth == 3 + assert sklearn_regressor.get_params()["sketch_eps"] == 0.5 + def test_regressor_params_basic(self): py_reg = SparkXGBRegressor() self.assertTrue(hasattr(py_reg, "n_estimators")) @@ -665,11 +860,6 @@ def test_param_alias(self): ): SparkXGBClassifier(featuresCol="f1") - def test_gpu_param_setting(self): - py_cls = SparkXGBClassifier(use_gpu=True) - train_params = py_cls._get_distributed_train_params(self.cls_df_train) - assert train_params["tree_method"] == "gpu_hist" - @staticmethod def test_param_value_converter(): py_cls = SparkXGBClassifier(missing=np.float64(1.0), sketch_eps=np.float64(0.3)) @@ -691,16 +881,6 @@ def test_regressor_basic(self): np.isclose(row.prediction, row.expected_prediction, atol=1e-3) ) - def test_classifier_basic(self): - classifier = SparkXGBClassifier() - model = classifier.fit(self.cls_df_train) - pred_result = model.transform(self.cls_df_test).collect() - for row in pred_result: - self.assertEqual(row.prediction, row.expected_prediction) - self.assertTrue( - np.allclose(row.probability, row.expected_probability, rtol=1e-3) - ) - def test_multi_classifier(self): classifier = SparkXGBClassifier() model = classifier.fit(self.multi_cls_df_train) @@ -710,12 +890,6 @@ def test_multi_classifier(self): np.allclose(row.probability, row.expected_probability, rtol=1e-3) ) - def _check_sub_dict_match(self, sub_dist, whole_dict, excluding_keys): - for k in sub_dist: - if k not in excluding_keys: - self.assertTrue(k in whole_dict, f"check on {k} failed") - self.assertEqual(sub_dist[k], whole_dict[k], f"check on {k} failed") - def test_regressor_with_params(self): regressor = SparkXGBRegressor(**self.reg_params) all_params = dict( @@ -723,7 +897,7 @@ def test_regressor_with_params(self): **(regressor._gen_fit_params_dict()), **(regressor._gen_predict_params_dict()), ) - self._check_sub_dict_match( + check_sub_dict_match( self.reg_params, all_params, excluding_keys=_non_booster_params ) @@ -733,7 +907,7 @@ def test_regressor_with_params(self): **(model._gen_fit_params_dict()), **(model._gen_predict_params_dict()), ) - self._check_sub_dict_match( + check_sub_dict_match( self.reg_params, all_params, excluding_keys=_non_booster_params ) pred_result = model.transform(self.reg_df_test).collect() @@ -744,35 +918,6 @@ def test_regressor_with_params(self): ) ) - def test_classifier_with_params(self): - classifier = SparkXGBClassifier(**self.cls_params) - all_params = dict( - **(classifier._gen_xgb_params_dict()), - **(classifier._gen_fit_params_dict()), - **(classifier._gen_predict_params_dict()), - ) - self._check_sub_dict_match( - self.cls_params, all_params, excluding_keys=_non_booster_params - ) - - model = classifier.fit(self.cls_df_train) - all_params = dict( - **(model._gen_xgb_params_dict()), - **(model._gen_fit_params_dict()), - **(model._gen_predict_params_dict()), - ) - self._check_sub_dict_match( - self.cls_params, all_params, excluding_keys=_non_booster_params - ) - pred_result = model.transform(self.cls_df_test).collect() - for row in pred_result: - self.assertEqual(row.prediction, row.expected_prediction_with_params) - self.assertTrue( - np.allclose( - row.probability, row.expected_probability_with_params, rtol=1e-3 - ) - ) - def test_regressor_model_save_load(self): tmp_dir = self.get_local_tmp_dir() path = "file:" + tmp_dir @@ -797,40 +942,12 @@ def test_regressor_model_save_load(self): self.assert_model_compatible(model, tmp_dir) - def test_classifier_model_save_load(self): - tmp_dir = self.get_local_tmp_dir() - path = "file:" + tmp_dir - regressor = SparkXGBClassifier(**self.cls_params) - model = regressor.fit(self.cls_df_train) - model.save(path) - loaded_model = SparkXGBClassifierModel.load(path) - self.assertEqual(model.uid, loaded_model.uid) - for k, v in self.cls_params.items(): - self.assertEqual(loaded_model.getOrDefault(k), v) - - pred_result = loaded_model.transform(self.cls_df_test).collect() - for row in pred_result: - self.assertTrue( - np.allclose( - row.probability, row.expected_probability_with_params, atol=1e-3 - ) - ) - - with self.assertRaisesRegex(AssertionError, "Expected class name"): - SparkXGBRegressorModel.load(path) - - self.assert_model_compatible(model, tmp_dir) - - @staticmethod - def _get_params_map(params_kv, estimator): - return {getattr(estimator, k): v for k, v in params_kv.items()} - def test_regressor_model_pipeline_save_load(self): tmp_dir = self.get_local_tmp_dir() path = "file:" + tmp_dir regressor = SparkXGBRegressor() pipeline = Pipeline(stages=[regressor]) - pipeline = pipeline.copy(extra=self._get_params_map(self.reg_params, regressor)) + pipeline = pipeline.copy(extra=get_params_map(self.reg_params, regressor)) model = pipeline.fit(self.reg_df_train) model.save(path) @@ -847,44 +964,6 @@ def test_regressor_model_pipeline_save_load(self): ) self.assert_model_compatible(model.stages[0], tmp_dir) - def test_classifier_model_pipeline_save_load(self): - tmp_dir = self.get_local_tmp_dir() - path = "file:" + tmp_dir - classifier = SparkXGBClassifier() - pipeline = Pipeline(stages=[classifier]) - pipeline = pipeline.copy( - extra=self._get_params_map(self.cls_params, classifier) - ) - model = pipeline.fit(self.cls_df_train) - model.save(path) - - loaded_model = PipelineModel.load(path) - for k, v in self.cls_params.items(): - self.assertEqual(loaded_model.stages[0].getOrDefault(k), v) - - pred_result = loaded_model.transform(self.cls_df_test).collect() - for row in pred_result: - self.assertTrue( - np.allclose( - row.probability, row.expected_probability_with_params, atol=1e-3 - ) - ) - self.assert_model_compatible(model.stages[0], tmp_dir) - - def test_classifier_with_cross_validator(self): - xgb_classifer = SparkXGBClassifier(n_estimators=1) - paramMaps = ParamGridBuilder().addGrid(xgb_classifer.max_depth, [1, 2]).build() - cvBin = CrossValidator( - estimator=xgb_classifer, - estimatorParamMaps=paramMaps, - evaluator=BinaryClassificationEvaluator(), - seed=1, - parallelism=4, - numFolds=2, - ) - cvBinModel = cvBin.fit(self.cls_df_train_large) - cvBinModel.transform(self.cls_df_test) - def test_callbacks(self): from xgboost.callback import LearningRateScheduler @@ -1003,38 +1082,6 @@ def test_use_gpu_param(self): classifier = SparkXGBClassifier(use_gpu=True, tree_method="gpu_hist") classifier = SparkXGBClassifier(use_gpu=True) - def test_convert_to_sklearn_model(self): - classifier = SparkXGBClassifier( - n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5 - ) - clf_model = classifier.fit(self.cls_df_train) - - regressor = SparkXGBRegressor( - n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5 - ) - reg_model = regressor.fit(self.reg_df_train) - - # Check that regardless of what booster, _convert_to_model converts to the correct class type - sklearn_classifier = classifier._convert_to_sklearn_model( - clf_model.get_booster().save_raw("json"), - clf_model.get_booster().save_config(), - ) - assert isinstance(sklearn_classifier, XGBClassifier) - assert sklearn_classifier.n_estimators == 200 - assert sklearn_classifier.missing == 2.0 - assert sklearn_classifier.max_depth == 3 - assert sklearn_classifier.get_params()["sketch_eps"] == 0.5 - - sklearn_regressor = regressor._convert_to_sklearn_model( - reg_model.get_booster().save_raw("json"), - reg_model.get_booster().save_config(), - ) - assert isinstance(sklearn_regressor, XGBRegressor) - assert sklearn_regressor.n_estimators == 200 - assert sklearn_regressor.missing == 2.0 - assert sklearn_regressor.max_depth == 3 - assert sklearn_classifier.get_params()["sketch_eps"] == 0.5 - def test_feature_importances(self): reg1 = SparkXGBRegressor(**self.reg_params) model = reg1.fit(self.reg_df_train) @@ -1060,32 +1107,6 @@ def test_regressor_array_col_as_feature(self): np.isclose(row.prediction, row.expected_prediction, atol=1e-3) ) - def test_classifier_array_col_as_feature(self): - train_dataset = self.cls_df_train.withColumn( - "features", vector_to_array(spark_sql_func.col("features")) - ) - test_dataset = self.cls_df_test.withColumn( - "features", vector_to_array(spark_sql_func.col("features")) - ) - classifier = SparkXGBClassifier() - model = classifier.fit(train_dataset) - - pred_result = model.transform(test_dataset).collect() - for row in pred_result: - self.assertEqual(row.prediction, row.expected_prediction) - self.assertTrue( - np.allclose(row.probability, row.expected_probability, rtol=1e-3) - ) - - def test_classifier_with_feature_names_types_weights(self): - classifier = SparkXGBClassifier( - feature_names=["a1", "a2", "a3"], - feature_types=["i", "int", "float"], - feature_weights=[2.0, 5.0, 3.0], - ) - model = classifier.fit(self.cls_df_train) - model.transform(self.cls_df_test).collect() - def test_regressor_with_sparse_optim(self): regressor = SparkXGBRegressor(missing=0.0) model = regressor.fit(self.reg_df_sparse_train) @@ -1192,11 +1213,6 @@ def test_empty_partition(self): classifier = SparkXGBClassifier(num_workers=4, tree_method=tree_method) classifier.fit(data_trans) - def test_early_stop_param_validation(self): - classifier = SparkXGBClassifier(early_stopping_rounds=1) - with pytest.raises(ValueError, match="early_stopping_rounds"): - classifier.fit(self.cls_df_train) - def test_unsupported_params(self): with pytest.raises(ValueError, match="evals_result"): SparkXGBClassifier(evals_result={})