From 2ebdec8aa66f67c886e445ee278940c5a1a586dc Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Mon, 14 Oct 2019 23:19:34 -0400 Subject: [PATCH] Fix dask prediction. (#4941) * Fix dask prediction. * Add better error messages for wrong partition. --- demo/dask/cpu_training.py | 9 ++++--- demo/dask/gpu_training.py | 9 +++---- python-package/xgboost/dask.py | 44 +++++++++++++++++++++++++--------- src/data/simple_csr_source.cc | 2 +- tests/python/test_with_dask.py | 11 ++++++--- 5 files changed, 51 insertions(+), 24 deletions(-) diff --git a/demo/dask/cpu_training.py b/demo/dask/cpu_training.py index dac734371243..d520b3639bb9 100644 --- a/demo/dask/cpu_training.py +++ b/demo/dask/cpu_training.py @@ -7,11 +7,10 @@ def main(client): # generate some random data for demonstration - n = 100 m = 100000 - partition_size = 1000 - X = da.random.random((m, n), partition_size) - y = da.random.random(m, partition_size) + n = 100 + X = da.random.random(size=(m, n), chunks=100) + y = da.random.random(size=(m, ), chunks=100) # DaskDMatrix acts like normal DMatrix, works as a proxy for local # DMatrix scatter around workers. @@ -38,6 +37,6 @@ def main(client): if __name__ == '__main__': # or use other clusters for scaling - with LocalCluster(n_workers=4, threads_per_worker=1) as cluster: + with LocalCluster(n_workers=7, threads_per_worker=1) as cluster: with Client(cluster) as client: main(client) diff --git a/demo/dask/gpu_training.py b/demo/dask/gpu_training.py index 807613e61488..c6530d886b35 100644 --- a/demo/dask/gpu_training.py +++ b/demo/dask/gpu_training.py @@ -6,11 +6,11 @@ def main(client): - n = 100 + # generate some random data for demonstration m = 100000 - partition_size = 1000 - X = da.random.random((m, n), partition_size) - y = da.random.random(m, partition_size) + n = 100 + X = da.random.random(size=(m, n), chunks=100) + y = da.random.random(size=(m, ), chunks=100) # DaskDMatrix acts like normal DMatrix, works as a proxy for local # DMatrix scatter around workers. @@ -23,6 +23,7 @@ def main(client): output = xgb.dask.train(client, {'verbosity': 2, 'nthread': 1, + # Golden line for GPU training 'tree_method': 'gpu_hist'}, dtrain, num_boost_round=4, evals=[(dtrain, 'train')]) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 0ea7c7315aa6..80cb47c07590 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -139,13 +139,14 @@ def __init__(self, self._missing = missing if len(data.shape) != 2: - _expect('2 dimensions input', data.shape) + raise ValueError( + 'Expecting 2 dimensional input, got: {shape}'.format( + shape=data.shape)) - if not any(isinstance(data, t) for t in (dd.DataFrame, da.Array)): + if not isinstance(data, (dd.DataFrame, da.Array)): raise TypeError(_expect((dd.DataFrame, da.Array), type(data))) - if not any( - isinstance(label, t) - for t in (dd.DataFrame, da.Array, dd.Series, type(None))): + if not isinstance(label, (dd.DataFrame, da.Array, dd.Series, + type(None))): raise TypeError( _expect((dd.DataFrame, da.Array, dd.Series), type(label))) @@ -158,6 +159,23 @@ def __init__(self, async def map_local_data(self, client, data, label=None, weights=None): '''Obtain references to local data.''' + + def inconsistent(left, left_name, right, right_name): + msg = 'Partitions between {a_name} and {b_name} are not ' \ + 'consistent: {a_len} != {b_len}. ' \ + 'Please try to repartition/rechunk your data.'.format( + a_name=left_name, b_name=right_name, a_len=len(left), + b_len=len(right) + ) + return msg + + def check_columns(parts): + # x is required to be 2 dim in __init__ + assert parts.ndim == 1 or parts.shape[1], 'Data should be' \ + ' partitioned by row. To avoid this specify the number' \ + ' of columns for your dask Array explicitly. e.g.' \ + ' chunks=(partition_size, X.shape[1])' + data = data.persist() if label is not None: label = label.persist() @@ -169,28 +187,28 @@ async def map_local_data(self, client, data, label=None, weights=None): # equivalents. X_parts = data.to_delayed() if isinstance(X_parts, numpy.ndarray): - assert X_parts.shape[1] == 1 + check_columns(X_parts) X_parts = X_parts.flatten().tolist() if label is not None: y_parts = label.to_delayed() if isinstance(y_parts, numpy.ndarray): - assert y_parts.ndim == 1 or y_parts.shape[1] == 1 + check_columns(y_parts) y_parts = y_parts.flatten().tolist() if weights is not None: w_parts = weights.to_delayed() if isinstance(w_parts, numpy.ndarray): - assert w_parts.ndim == 1 or w_parts.shape[1] == 1 + check_columns(w_parts) w_parts = w_parts.flatten().tolist() parts = [X_parts] if label is not None: assert len(X_parts) == len( - y_parts), 'Partitions between X and y are not consistent' + y_parts), inconsistent(X_parts, 'X', y_parts, 'labels') parts.append(y_parts) if weights is not None: assert len(X_parts) == len( - w_parts), 'Partitions between X and weight are not consistent.' + w_parts), inconsistent(X_parts, 'X', w_parts, 'weights') parts.append(w_parts) parts = list(map(delayed, zip(*parts))) @@ -275,7 +293,11 @@ def get_worker_data_shape(self, worker): cols = 0 for shape in shapes: rows += shape[0] - cols += shape[1] + + c = shape[1] + assert cols in (0, c), 'Shape between partitions are not the' \ + ' same. Got: {left} and {right}'.format(left=c, right=cols) + cols = c return (rows, cols) diff --git a/src/data/simple_csr_source.cc b/src/data/simple_csr_source.cc index f114c81c15d2..2723e9a2fa84 100644 --- a/src/data/simple_csr_source.cc +++ b/src/data/simple_csr_source.cc @@ -185,7 +185,7 @@ void SimpleCSRSource::CopyFrom(std::string const& cuda_interfaces_str, cuda_interfaces_str.size()}); std::vector const& columns = get(interfaces); size_t n_columns = columns.size(); - CHECK_GT(n_columns, 0) << "Number of columns must not be greater than 0."; + CHECK_GT(n_columns, 0) << "Number of columns must not eqaul to 0."; auto const& typestr = get(columns[0]["typestr"]); CHECK_EQ(typestr.size(), 3) << ColumnarErrors::TypestrFormat(); diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index e9aff6bd7f5a..01b8c3d3f625 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -21,12 +21,12 @@ pass kRows = 1000 +kCols = 10 def generate_array(): - n = 10 partition_size = 20 - X = da.random.random((kRows, n), partition_size) + X = da.random.random((kRows, kCols), partition_size) y = da.random.random(kRows, partition_size) return X, y @@ -44,7 +44,7 @@ def test_from_dask_dataframe(client): prediction = xgb.dask.predict(client, model=booster, data=dtrain) assert isinstance(prediction, da.Array) - assert prediction.shape[0] == kRows, prediction + assert prediction.shape[0] == kRows and prediction.shape[1] == kCols with pytest.raises(ValueError): # evals_result is not supported in dask interface. @@ -59,6 +59,7 @@ def test_from_dask_array(client): result = xgb.dask.train(client, {}, dtrain) prediction = xgb.dask.predict(client, result, dtrain) + assert prediction.shape[0] == kRows and prediction.shape[1] == kCols assert isinstance(prediction, da.Array) @@ -71,6 +72,8 @@ def test_regressor(client): regressor.fit(X, y, eval_set=[(X, y)]) prediction = regressor.predict(X) + assert prediction.shape[0] == kRows and prediction.shape[1] == kCols + history = regressor.evals_result() assert isinstance(prediction, da.Array) @@ -88,6 +91,8 @@ def test_classifier(client): classifier.fit(X, y, eval_set=[(X, y)]) prediction = classifier.predict(X) + assert prediction.shape[0] == kRows and prediction.shape[1] == kCols + history = classifier.evals_result() assert isinstance(prediction, da.Array)