Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PySpark XGBoost integration #8020

Merged
merged 74 commits into from
Jul 13, 2022
Merged

PySpark XGBoost integration #8020

merged 74 commits into from
Jul 13, 2022

Conversation

WeichenXu123
Copy link
Contributor

@WeichenXu123 WeichenXu123 commented Jun 21, 2022

Signed-off-by: Weichen Xu weichen.xu@databricks.com

PySpark XGBoost integration.

class SparkXGBRegressor(_SparkXGBEstimator):
    """
    SparkXGBRegressor is a PySpark ML estimator. It implements the XGBoost regression
    algorithm based on XGBoost python library, and it can be used in PySpark Pipeline
    and PySpark ML meta algorithms like CrossValidator/TrainValidationSplit/OneVsRest.

    SparkXGBRegressor automatically supports most of the parameters in
    `xgboost.XGBRegressor` constructor and most of the parameters used in
    `xgboost.XGBRegressor` fit and predict method (see `API docs <https://xgboost.readthedocs\
    .io/en/latest/python/python_api.html#xgboost.XGBRegressor>`_ for details).

    SparkXGBRegressor doesn't support setting `gpu_id` but support another param `use_gpu`,
    see doc below for more details.

    SparkXGBRegressor doesn't support setting `base_margin` explicitly as well, but support
    another param called `base_margin_col`. see doc below for more details.

    SparkXGBRegressor doesn't support following params:
      `gpu_id`, `enable_categorical`, `use_label_encoder`, `n_jobs`, `nthread`,
      `validate_features`, `output_margin`, `base_margin` param.

    callbacks:
        The export and import of the callback functions are at best effort.
        For details, see :py:attr:`xgboost.spark.SparkXGBRegressor.callbacks` param doc.
    missing:
        The parameter `missing` in SparkXGBRegressor has different semantics with
        that in `xgboost.XGBRegressor`. For details, see
        :py:attr:`xgboost.spark.SparkXGBRegressor.missing` param doc.
    validationIndicatorCol
        For params related to `xgboost.XGBRegressor` training
        with evaluation dataset's supervision, set
        :py:attr:`xgboost.spark.SparkXGBRegressor.validationIndicatorCol`
        parameter instead of setting the `eval_set` parameter in `xgboost.XGBRegressor`
        fit method.
    weightCol:
        To specify the weight of the training and validation dataset, set
        :py:attr:`xgboost.spark.SparkXGBRegressor.weightCol` parameter instead of setting
        `sample_weight` and `sample_weight_eval_set` parameter in `xgboost.XGBRegressor`
        fit method.
    xgb_model:
        Set the value to be the instance returned by
        :func:`xgboost.spark.SparkXGBRegressorModel.get_booster`.
    num_workers:
        Integer that specifies the number of XGBoost workers to use.
        Each XGBoost worker corresponds to one spark task.
    use_gpu:
        Boolean that specifies whether the executors are running on GPU
        instances.
    base_margin_col:
        To specify the base margins of the training and validation
        dataset, set :py:attr:`xgboost.spark.SparkXGBRegressor.base_margin_col` parameter
        instead of setting `base_margin` and `base_margin_eval_set` in the
        `xgboost.XGBRegressor` fit method. Note: this isn't available for distributed
        training.
    >>> from xgboost.spark import SparkXGBRegressor
    >>> from pyspark.ml.linalg import Vectors
    >>> df_train = spark.createDataFrame([
    ...     (Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
    ...     (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
    ...     (Vectors.dense(4.0, 5.0, 6.0), 2, True, 1.0),
    ...     (Vectors.sparse(3, {1: 6.0, 2: 7.5}), 3, True, 2.0),
    ... ], ["features", "label", "isVal", "weight"])
    >>> df_test = spark.createDataFrame([
    ...     (Vectors.dense(1.0, 2.0, 3.0), ),
    ...     (Vectors.sparse(3, {1: 1.0, 2: 5.5}), )
    ... ], ["features"])
    >>> xgb_regressor = SparkXGBRegressor(max_depth=5, missing=0.0,
    ... validation_indicator_col='isVal', weight_col='weight',
    ... early_stopping_rounds=1, eval_metric='rmse')
    >>> xgb_reg_model = xgb_regressor.fit(df_train)
    >>> xgb_reg_model.transform(df_test)
class SparkXGBClassifier(_SparkXGBEstimator, HasProbabilityCol, HasRawPredictionCol):
    """
    SparkXGBClassifier is a PySpark ML estimator. It implements the XGBoost classification
    algorithm based on XGBoost python library, and it can be used in PySpark Pipeline
    and PySpark ML meta algorithms like CrossValidator/TrainValidationSplit/OneVsRest.

    SparkXGBClassifier automatically supports most of the parameters in
    `xgboost.XGBClassifier` constructor and most of the parameters used in
    `xgboost.XGBClassifier` fit and predict method (see `API docs <https://xgboost.readthedocs\
    .io/en/latest/python/python_api.html#xgboost.XGBClassifier>`_ for details).

    SparkXGBClassifier doesn't support setting `gpu_id` but support another param `use_gpu`,
    see doc below for more details.

    SparkXGBClassifier doesn't support setting `base_margin` explicitly as well, but support
    another param called `base_margin_col`. see doc below for more details.

    SparkXGBClassifier doesn't support setting `output_margin`, but we can get output margin
    from the raw prediction column. See `rawPredictionCol` param doc below for more details.

    SparkXGBClassifier doesn't support `validate_features` and `output_margin` param.

    Parameters
    ----------
    callbacks:
        The export and import of the callback functions are at best effort. For
        details, see :py:attr:`xgboost.spark.SparkXGBClassifier.callbacks` param doc.
    missing:
        The parameter `missing` in SparkXGBClassifier has different semantics with
        that in `xgboost.XGBClassifier`. For details, see
        :py:attr:`xgboost.spark.SparkXGBClassifier.missing` param doc.
    rawPredictionCol:
        The `output_margin=True` is implicitly supported by the
        `rawPredictionCol` output column, which is always returned with the predicted margin
        values.
    validationIndicatorCol:
        For params related to `xgboost.XGBClassifier` training with
        evaluation dataset's supervision,
        set :py:attr:`xgboost.spark.SparkXGBClassifier.validationIndicatorCol`
        parameter instead of setting the `eval_set` parameter in `xgboost.XGBClassifier`
        fit method.
    weightCol:
        To specify the weight of the training and validation dataset, set
        :py:attr:`xgboost.spark.SparkXGBClassifier.weightCol` parameter instead of setting
        `sample_weight` and `sample_weight_eval_set` parameter in `xgboost.XGBClassifier`
        fit method.
    xgb_model:
        Set the value to be the instance returned by
        :func:`xgboost.spark.SparkXGBClassifierModel.get_booster`.
    num_workers:
        Integer that specifies the number of XGBoost workers to use.
        Each XGBoost worker corresponds to one spark task.
    use_gpu:
        Boolean that specifies whether the executors are running on GPU
        instances.
    base_margin_col:
        To specify the base margins of the training and validation
        dataset, set :py:attr:`xgboost.spark.SparkXGBClassifier.base_margin_col` parameter
        instead of setting `base_margin` and `base_margin_eval_set` in the
        `xgboost.XGBClassifier` fit method. Note: this isn't available for distributed
        training.
    >>> from xgboost.spark import SparkXGBClassifier
    >>> from pyspark.ml.linalg import Vectors
    >>> df_train = spark.createDataFrame([
    ...     (Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
    ...     (Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1, False, 2.0),
    ...     (Vectors.dense(4.0, 5.0, 6.0), 0, True, 1.0),
    ...     (Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, True, 2.0),
    ... ], ["features", "label", "isVal", "weight"])
    >>> df_test = spark.createDataFrame([
    ...     (Vectors.dense(1.0, 2.0, 3.0), ),
    ... ], ["features"])
    >>> xgb_classifier = SparkXGBClassifier(max_depth=5, missing=0.0,
    ...     validation_indicator_col='isVal', weight_col='weight',
    ...     early_stopping_rounds=1, eval_metric='logloss')
    >>> xgb_clf_model = xgb_classifier.fit(df_train)
    >>> xgb_clf_model.transform(df_test).show()

Summarize some important follow-ups:

  1. Apply DMatrix data iterator interface
  2. model summary / explanation
  3. Classifier uses CPU functions like softmax, how to address it ?
  4. Support XGB random forest classifer / regressor
  5. Support XGB ranker
  6. Support PySpark SparseVector (we need wait pyspark 3.4 out)
  7. Update tests, remove unittest dependency.
  8. Add type annotations for xgboost.spark APIs
  9. Add demo/guide-python and doc/tutorials
  10. Fix the macOS github action on python xgboost.spark.

I will take 3 / 6 / 7 / 9 / 10, other items I would like to assign them to other community guys.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Copy link
Member

@trivialfis trivialfis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for getting this started! I haven't looked into the details yet, for the initial work let's get some simple things like code style aligned first to avoid massive changes later.

  • Consider using the same document style as the rest of the library. (For instance, replace :param: notation).
  • Consider using the black formatter.

Please add a simple demonstration in demo/guide-python so that we can run it and gain a better intuition on how things work. A tutorial in doc/tutorials is also extremely welcomed!

I will attach detail design doc soon.

That would be great!

python-package/xgboost/spark/__init__.py Outdated Show resolved Hide resolved
python-package/xgboost/spark/data.py Outdated Show resolved Hide resolved
python-package/xgboost/spark/data.py Show resolved Hide resolved
@wbo4958
Copy link
Contributor

wbo4958 commented Jun 25, 2022

I was thinking how do we support categorical data in this PR? @trivialfis

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@trivialfis
Copy link
Member

trivialfis commented Jun 28, 2022

Quick note for followups:

  • We need to filter out the spark-specific parameters before passing them into XGBoost train function:
Parameters: { "baseMarginCol", "force_repartition" } might not be used.

  This could be a false alarm, with some parameters getting used by language bindings but
  then being mistakenly passed down to XGBoost core, or some parameter actually being used
  but getting flagged wrongly here. Please open an issue if you find any such cases.
  • The test should clean up themselves after finishing. Use tempfile for generating temporary directories.
  • Prediction is thread-safe in XGBoost.
  • Prediction doesn't need data concatenation. This can also remove specializations in data processing procedures.
  • We use pytest, unittest.TestCase is not needed.
  • Python2 compatibility code is not needed.
  • Not entirely sure about the extensive use of double. XGBoost returns f32 most of the time.
  • Classifier uses CPU functions like softmax.
  • xgb_model_creator unused for _fit_distributed.
  • DMatrix construction should happen inside rabit context.
  • Model summary.
  • Model explanation.

Only suggestions, we can improve upon the existing PR after the initial merge.

@wbo4958
Copy link
Contributor

wbo4958 commented Jun 29, 2022

another question, how to support the categorical data?

@trivialfis
Copy link
Member

another question, how to support the categorical data?

We will work on it later. I assume we will simply pickle the feature types to executors.

Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Weichen Xu <weichen.xu@databricks.com>
@WeichenXu123
Copy link
Contributor Author

@trivialfis @hcho3 What's the memory limitation for the github action machine for running "Test XGBoost Python package on macos-10.15" ?
I guess the github action machine might have too low memory which causes spark executor crash and causes the spark jobs hangs when running tests.

@WeichenXu123
Copy link
Contributor Author

@hcho3 Could you assign me the permission to kill/rerun github actions ? so that I can rerun test faster.

@hcho3
Copy link
Collaborator

hcho3 commented Jul 8, 2022

@WeichenXu123 Once your first PR is merged, you'll have permissions to run and restart tests. Right now, you have limited permissions as a first-time contributor.

Can we merge this PR without MacOS support for now?

@WeichenXu123
Copy link
Contributor Author

@WeichenXu123 Once your first PR is merged, you'll have permissions to run and restart tests. Right now, you have limited permissions as a first-time contributor.

Can we merge this PR without MacOS support for now?

We can. The CI hanging is very weird, local run I don't find any issue. Probably due to the CI machine resource limitation.

@WeichenXu123
Copy link
Contributor Author

@trivialfis @wbo4958 @hcho3
All issues fixed, is this PR ready to merge ?

@WeichenXu123 WeichenXu123 requested review from trivialfis and hcho3 July 11, 2022 02:38
@wbo4958
Copy link
Contributor

wbo4958 commented Jul 11, 2022

Thx @WeichenXu123 for your hard work, we're expecting to merge it ASAP. @trivialfis Would you help on this?

Copy link
Member

@trivialfis trivialfis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the excellent work on the pyspark interface! Will merge it once CI is fixed.

@WeichenXu123
Copy link
Contributor Author

Thank you for the excellent work on the pyspark interface! Will merge it once CI is fixed.

Thank you. @trivialfis There are some failure tests which is not related to my PR. Are they fixed in master branch ? If so I can merge master and rerun CI.

@hcho3
Copy link
Collaborator

hcho3 commented Jul 11, 2022

Please wait until the CI is fixed. ETA by tomorrow

@WeichenXu123
Copy link
Contributor Author

@hcho3 Is CI fixed ? Thank you!

@hcho3
Copy link
Collaborator

hcho3 commented Jul 13, 2022

@WeichenXu123 I restarted the tests. Hopefully we can merge this PR soon

@trivialfis trivialfis merged commit 176fec8 into dmlc:master Jul 13, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants