Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] Return GPU Series when input is from cuDF. #5710

Merged
merged 3 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions python-package/xgboost/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,8 @@ def lazy_isinstance(instance, module, name):

# cudf
try:
from cudf import DataFrame as CUDF_DataFrame
from cudf import Series as CUDF_Series
from cudf import concat as CUDF_concat
CUDF_INSTALLED = True
except ImportError:
CUDF_DataFrame = object
CUDF_Series = object
CUDF_MultiIndex = object
CUDF_INSTALLED = False
CUDF_concat = None

# sklearn
Expand Down
9 changes: 4 additions & 5 deletions python-package/xgboost/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

from .compat import (
STRING_TYPES, DataFrame, py_str,
PANDAS_INSTALLED, CUDF_INSTALLED,
CUDF_DataFrame,
PANDAS_INSTALLED,
os_fspath, os_PathLike, lazy_isinstance)
from .libpath import find_lib_path

Expand Down Expand Up @@ -282,8 +281,8 @@ def _convert_unknown_data(data, meta=None, meta_type=None):

# Either object has cuda array interface or contains columns with interfaces
def _has_cuda_array_interface(data):
return hasattr(data, '__cuda_array_interface__') or (
CUDF_INSTALLED and isinstance(data, CUDF_DataFrame))
return hasattr(data, '__cuda_array_interface__') or \
lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame')


def _cudf_array_interfaces(df):
Expand Down Expand Up @@ -508,7 +507,7 @@ def set_uint_info(self, field, data):
def set_interface_info(self, field, data):
"""Set info type property into DMatrix."""
# If we are passed a dataframe, extract the series
if CUDF_INSTALLED and isinstance(data, CUDF_DataFrame):
if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'):
if len(data.columns) != 1:
raise ValueError(
'Expecting meta-info to contain a single column')
Expand Down
62 changes: 31 additions & 31 deletions python-package/xgboost/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from .compat import da, dd, delayed, get_client
from .compat import sparse, scipy_sparse
from .compat import PANDAS_INSTALLED, DataFrame, Series, pandas_concat
from .compat import CUDF_INSTALLED, CUDF_DataFrame, CUDF_Series, CUDF_concat
from .compat import CUDF_concat
from .compat import lazy_isinstance

from .core import DMatrix, Booster, _expect
Expand Down Expand Up @@ -97,7 +97,8 @@ def concat(value): # pylint: disable=too-many-return-statements
return sparse.concatenate(value, axis=0)
if PANDAS_INSTALLED and isinstance(value[0], (DataFrame, Series)):
return pandas_concat(value, axis=0)
if CUDF_INSTALLED and isinstance(value[0], (CUDF_DataFrame, CUDF_Series)):
if lazy_isinstance(value[0], 'cudf.core.dataframe', 'DataFrame') or \
lazy_isinstance(value[0], 'cudf.core.series', 'Series'):
return CUDF_concat(value, axis=0)
if lazy_isinstance(value[0], 'cupy.core.core', 'ndarray'):
import cupy # pylint: disable=import-error
Expand Down Expand Up @@ -461,6 +462,25 @@ def dispatched_train(worker_addr):
return list(filter(lambda ret: ret is not None, results))[0]


def _direct_predict_impl(client, data, predict_fn):
if isinstance(data, da.Array):
predictions = client.submit(
da.map_blocks,
predict_fn, data, False, drop_axis=1,
dtype=numpy.float32
).result()
return predictions
if isinstance(data, dd.DataFrame):
predictions = client.submit(
dd.map_partitions,
predict_fn, data, True,
meta=dd.utils.make_meta({'prediction': 'f4'})
).result()
return predictions.iloc[:, 0]
raise TypeError('data of type: ' + str(type(data)) +
' is not supported by direct prediction')


def predict(client, model, data, *args, missing=numpy.nan):
'''Run prediction with a trained booster.

Expand Down Expand Up @@ -502,26 +522,19 @@ def predict(client, model, data, *args, missing=numpy.nan):

def mapped_predict(partition, is_df):
worker = distributed_get_worker()
booster.set_param({'nthread': worker.nthreads})
m = DMatrix(partition, missing=missing, nthread=worker.nthreads)
predt = booster.predict(m, *args, validate_features=False)
if is_df:
predt = DataFrame(predt, columns=['prediction'])
if lazy_isinstance(partition, 'cudf', 'core.dataframe.DataFrame'):
import cudf # pylint: disable=import-error
predt = cudf.DataFrame(predt, columns=['prediction'])
else:
predt = DataFrame(predt, columns=['prediction'])
return predt

if isinstance(data, da.Array):
predictions = client.submit(
da.map_blocks,
mapped_predict, data, False, drop_axis=1,
dtype=numpy.float32
).result()
return predictions
if isinstance(data, dd.DataFrame):
predictions = client.submit(
dd.map_partitions,
mapped_predict, data, True,
meta=dd.utils.make_meta({'prediction': 'f4'})
).result()
return predictions.iloc[:, 0]
if isinstance(data, (da.Array, dd.DataFrame)):
return _direct_predict_impl(client, data, mapped_predict)

# Prediction on dask DMatrix.
worker_map = data.worker_map
Expand Down Expand Up @@ -644,20 +657,7 @@ def mapped_predict(data, is_df):
dtype=numpy.float32)
return prediction

if isinstance(data, da.Array):
predictions = client.submit(
da.map_blocks,
mapped_predict, data, False, drop_axis=1,
dtype=numpy.float32
).result()
return predictions
if isinstance(data, dd.DataFrame):
predictions = client.submit(
dd.map_partitions,
mapped_predict, data, True,
meta=dd.utils.make_meta({'prediction': 'f4'})
).result()
return predictions.iloc[:, 0]
return _direct_predict_impl(client, data, mapped_predict)


def _evaluation_matrices(client, validation_set, sample_weights, missing):
Expand Down
18 changes: 16 additions & 2 deletions tests/python-gpu/test_gpu_with_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ def test_dask_dataframe(self):
out = dxgb.train(client, {'tree_method': 'gpu_hist'},
dtrain=dtrain,
evals=[(dtrain, 'X')],
num_boost_round=2)
num_boost_round=4)

assert isinstance(out['booster'], dxgb.Booster)
assert len(out['history']['X']['rmse']) == 2
assert len(out['history']['X']['rmse']) == 4

predictions = dxgb.predict(client, out, dtrain).compute()
assert isinstance(predictions, np.ndarray)
Expand All @@ -62,6 +62,20 @@ def test_dask_dataframe(self):
cupy.testing.assert_allclose(single_node, predictions)
cupy.testing.assert_allclose(single_node, series_predictions)

predt = dxgb.predict(client, out, X)
assert isinstance(predt, dd.Series)

def is_df(part):
assert isinstance(part, cudf.DataFrame), part
return part

predt.map_partitions(
is_df,
meta=dd.utils.make_meta({'prediction': 'f4'}))

cupy.testing.assert_allclose(
predt.values.compute(), single_node)

@pytest.mark.skipif(**tm.no_cupy())
@pytest.mark.mgpu
def test_dask_array(self):
Expand Down
8 changes: 7 additions & 1 deletion tests/python/testing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# coding: utf-8
from xgboost.compat import SKLEARN_INSTALLED, PANDAS_INSTALLED
from xgboost.compat import CUDF_INSTALLED, DASK_INSTALLED
from xgboost.compat import DASK_INSTALLED


def no_sklearn():
Expand Down Expand Up @@ -46,6 +46,12 @@ def no_dask_cuda():


def no_cudf():
try:
import cudf # noqa
CUDF_INSTALLED = True
except ImportError:
CUDF_INSTALLED = False

return {'condition': not CUDF_INSTALLED,
'reason': 'CUDF is not installed'}

Expand Down