From 81851aed8e4996bebfe24fd5b6049e8d5e4261c7 Mon Sep 17 00:00:00 2001 From: fis Date: Tue, 26 May 2020 20:15:02 +0800 Subject: [PATCH 1/3] [dask] Return GPU dataframe when input is from GPU. --- python-package/xgboost/compat.py | 7 ------- python-package/xgboost/core.py | 9 ++++----- python-package/xgboost/dask.py | 11 ++++++++--- tests/python-gpu/test_gpu_with_dask.py | 18 ++++++++++++++++-- tests/python/testing.py | 8 +++++++- 5 files changed, 35 insertions(+), 18 deletions(-) diff --git a/python-package/xgboost/compat.py b/python-package/xgboost/compat.py index 3064d3501b6b..281db3bcb3ed 100644 --- a/python-package/xgboost/compat.py +++ b/python-package/xgboost/compat.py @@ -105,15 +105,8 @@ def lazy_isinstance(instance, module, name): # cudf try: - from cudf import DataFrame as CUDF_DataFrame - from cudf import Series as CUDF_Series from cudf import concat as CUDF_concat - CUDF_INSTALLED = True except ImportError: - CUDF_DataFrame = object - CUDF_Series = object - CUDF_MultiIndex = object - CUDF_INSTALLED = False CUDF_concat = None # sklearn diff --git a/python-package/xgboost/core.py b/python-package/xgboost/core.py index d701309a9824..f25886c3c36f 100644 --- a/python-package/xgboost/core.py +++ b/python-package/xgboost/core.py @@ -17,8 +17,7 @@ from .compat import ( STRING_TYPES, DataFrame, py_str, - PANDAS_INSTALLED, CUDF_INSTALLED, - CUDF_DataFrame, + PANDAS_INSTALLED, os_fspath, os_PathLike, lazy_isinstance) from .libpath import find_lib_path @@ -282,8 +281,8 @@ def _convert_unknown_data(data, meta=None, meta_type=None): # Either object has cuda array interface or contains columns with interfaces def _has_cuda_array_interface(data): - return hasattr(data, '__cuda_array_interface__') or ( - CUDF_INSTALLED and isinstance(data, CUDF_DataFrame)) + return hasattr(data, '__cuda_array_interface__') or \ + lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame') def _cudf_array_interfaces(df): @@ -508,7 +507,7 @@ def set_uint_info(self, field, data): def set_interface_info(self, field, data): """Set info type property into DMatrix.""" # If we are passed a dataframe, extract the series - if CUDF_INSTALLED and isinstance(data, CUDF_DataFrame): + if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'): if len(data.columns) != 1: raise ValueError( 'Expecting meta-info to contain a single column') diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 1cc15d744e82..7725a463425c 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -25,7 +25,7 @@ from .compat import da, dd, delayed, get_client from .compat import sparse, scipy_sparse from .compat import PANDAS_INSTALLED, DataFrame, Series, pandas_concat -from .compat import CUDF_INSTALLED, CUDF_DataFrame, CUDF_Series, CUDF_concat +from .compat import CUDF_concat from .compat import lazy_isinstance from .core import DMatrix, Booster, _expect @@ -97,7 +97,8 @@ def concat(value): # pylint: disable=too-many-return-statements return sparse.concatenate(value, axis=0) if PANDAS_INSTALLED and isinstance(value[0], (DataFrame, Series)): return pandas_concat(value, axis=0) - if CUDF_INSTALLED and isinstance(value[0], (CUDF_DataFrame, CUDF_Series)): + if lazy_isinstance(value[0], 'cudf.core.dataframe', 'DataFrame') or \ + lazy_isinstance(value[0], 'cudf.core.series', 'Series'): return CUDF_concat(value, axis=0) if lazy_isinstance(value[0], 'cupy.core.core', 'ndarray'): import cupy # pylint: disable=import-error @@ -505,7 +506,11 @@ def mapped_predict(partition, is_df): m = DMatrix(partition, missing=missing, nthread=worker.nthreads) predt = booster.predict(m, *args, validate_features=False) if is_df: - predt = DataFrame(predt, columns=['prediction']) + if lazy_isinstance(partition, 'cudf', 'core.dataframe.DataFrame'): + import cudf + predt = cudf.DataFrame(predt, columns=['prediction']) + else: + predt = DataFrame(predt, columns=['prediction']) return predt if isinstance(data, da.Array): diff --git a/tests/python-gpu/test_gpu_with_dask.py b/tests/python-gpu/test_gpu_with_dask.py index 7cc45a428459..97eeb5bdf30f 100644 --- a/tests/python-gpu/test_gpu_with_dask.py +++ b/tests/python-gpu/test_gpu_with_dask.py @@ -44,10 +44,10 @@ def test_dask_dataframe(self): out = dxgb.train(client, {'tree_method': 'gpu_hist'}, dtrain=dtrain, evals=[(dtrain, 'X')], - num_boost_round=2) + num_boost_round=4) assert isinstance(out['booster'], dxgb.Booster) - assert len(out['history']['X']['rmse']) == 2 + assert len(out['history']['X']['rmse']) == 4 predictions = dxgb.predict(client, out, dtrain).compute() assert isinstance(predictions, np.ndarray) @@ -62,6 +62,20 @@ def test_dask_dataframe(self): cupy.testing.assert_allclose(single_node, predictions) cupy.testing.assert_allclose(single_node, series_predictions) + predt = dxgb.predict(client, out, X) + assert isinstance(predt, dd.Series) + + def is_df(part): + assert isinstance(part, cudf.DataFrame), part + return part + + predt.map_partitions( + is_df, + meta=dd.utils.make_meta({'prediction': 'f4'})) + + cupy.testing.assert_allclose( + predt.values.compute(), single_node) + @pytest.mark.skipif(**tm.no_cupy()) @pytest.mark.mgpu def test_dask_array(self): diff --git a/tests/python/testing.py b/tests/python/testing.py index 708e5af4ca55..07b0f5b04d27 100644 --- a/tests/python/testing.py +++ b/tests/python/testing.py @@ -1,6 +1,6 @@ # coding: utf-8 from xgboost.compat import SKLEARN_INSTALLED, PANDAS_INSTALLED -from xgboost.compat import CUDF_INSTALLED, DASK_INSTALLED +from xgboost.compat import DASK_INSTALLED def no_sklearn(): @@ -46,6 +46,12 @@ def no_dask_cuda(): def no_cudf(): + try: + import cudf # noqa + CUDF_INSTALLED = True + except ImportError: + CUDF_INSTALLED = False + return {'condition': not CUDF_INSTALLED, 'reason': 'CUDF is not installed'} From 3b82008abbd9d702374698ef54661d702d4d2198 Mon Sep 17 00:00:00 2001 From: fis Date: Tue, 26 May 2020 21:03:21 +0800 Subject: [PATCH 2/3] Refactor. --- python-package/xgboost/dask.py | 51 +++++++++++++++------------------- 1 file changed, 22 insertions(+), 29 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 7725a463425c..904a7bee3e1a 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -462,6 +462,23 @@ def dispatched_train(worker_addr): return list(filter(lambda ret: ret is not None, results))[0] +def _direct_predict_impl(client, data, predict_fn): + if isinstance(data, da.Array): + predictions = client.submit( + da.map_blocks, + predict_fn, data, False, drop_axis=1, + dtype=numpy.float32 + ).result() + return predictions + if isinstance(data, dd.DataFrame): + predictions = client.submit( + dd.map_partitions, + predict_fn, data, True, + meta=dd.utils.make_meta({'prediction': 'f4'}) + ).result() + return predictions.iloc[:, 0] + + def predict(client, model, data, *args, missing=numpy.nan): '''Run prediction with a trained booster. @@ -503,30 +520,19 @@ def predict(client, model, data, *args, missing=numpy.nan): def mapped_predict(partition, is_df): worker = distributed_get_worker() + booster.set_param({'nthread': worker.nthreads}) m = DMatrix(partition, missing=missing, nthread=worker.nthreads) predt = booster.predict(m, *args, validate_features=False) if is_df: if lazy_isinstance(partition, 'cudf', 'core.dataframe.DataFrame'): - import cudf + import cudf # pylint: disable=import-error predt = cudf.DataFrame(predt, columns=['prediction']) else: predt = DataFrame(predt, columns=['prediction']) return predt - if isinstance(data, da.Array): - predictions = client.submit( - da.map_blocks, - mapped_predict, data, False, drop_axis=1, - dtype=numpy.float32 - ).result() - return predictions - if isinstance(data, dd.DataFrame): - predictions = client.submit( - dd.map_partitions, - mapped_predict, data, True, - meta=dd.utils.make_meta({'prediction': 'f4'}) - ).result() - return predictions.iloc[:, 0] + if isinstance(data, (da.Array, dd.DataFrame)): + return _direct_predict_impl(client, data, mapped_predict) # Prediction on dask DMatrix. worker_map = data.worker_map @@ -649,20 +655,7 @@ def mapped_predict(data, is_df): dtype=numpy.float32) return prediction - if isinstance(data, da.Array): - predictions = client.submit( - da.map_blocks, - mapped_predict, data, False, drop_axis=1, - dtype=numpy.float32 - ).result() - return predictions - if isinstance(data, dd.DataFrame): - predictions = client.submit( - dd.map_partitions, - mapped_predict, data, True, - meta=dd.utils.make_meta({'prediction': 'f4'}) - ).result() - return predictions.iloc[:, 0] + return _direct_predict_impl(client, data, mapped_predict) def _evaluation_matrices(client, validation_set, sample_weights, missing): From 6244b2ea824dec51d6d209ab266a55b0220d0dea Mon Sep 17 00:00:00 2001 From: fis Date: Tue, 26 May 2020 21:08:17 +0800 Subject: [PATCH 3/3] TypeError. --- python-package/xgboost/dask.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 904a7bee3e1a..5e7e8624fdbf 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -477,6 +477,8 @@ def _direct_predict_impl(client, data, predict_fn): meta=dd.utils.make_meta({'prediction': 'f4'}) ).result() return predictions.iloc[:, 0] + raise TypeError('data of type: ' + str(type(data)) + + ' is not supported by direct prediction') def predict(client, model, data, *args, missing=numpy.nan):