Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] XGBoost.dask.predict returning an incompatible dask_cudf Series #5823

Closed
think-high opened this issue Jun 23, 2020 · 3 comments
Closed

Comments

@think-high
Copy link

Bug Description
I am trying to subtract the results from XGBoost.dask.predict with another dask_cudf Series but that's giving an error. I realize that it's because each individual partition is not mapped as a cuda Series. I am able to solve it if I separately run map_partitions to map each partitions as a cuda series. It will good to do this internally in XGBoost.dask.predict before returning the result.

Here is code snippet to reproduce the bug:

%%time 
size = 1000000
npartitions = 8
cdf = cudf.DataFrame({'x': np.random.randint(0, npartitions, size=size), 'y': np.random.normal(size=size)})
ddf = dask_cudf.from_cudf(cdf,npartitions=npartitions)
X_train = ddf.query('y < 0.5')
Y_train = X_train[['x']]
X_train = X_train[X_train.columns.difference(['x'])]
X_test = ddf.query('y > 0.5')
Y_test = X_test[['x']]
X_test = X_test[X_test.columns.difference(['x'])]
done = wait([X_train, Y_train])
dtrain = xgb.dask.DaskDMatrix(client, X_train, Y_train)
params = {
  'num_rounds':   100,
  'max_depth':    8,
  'max_leaves':   2**8,
  'tree_method':  'gpu_hist',
  'objective':    'reg:squarederror',
  'grow_policy':  'lossguide'
}
trained_model = xgb.dask.train(client,params, dtrain, num_boost_round=params['num_rounds'])
booster = trained_model["booster"]
booster.set_param({'predictor': 'gpu_predictor'})
pred = xgb.dask.predict(client,booster, X_test)
true = Y_test['x'].reset_index(drop=True)

print(type(pred),type(true))
## This gives:: (dask_cudf.core.Series, dask_cudf.core.Series)

diff = pred - true
diff.head()

diff.head() throws an error. The details block below contains the error trace:

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-233-f7826efbac4c> in <module>
      5 true = true.reset_index(drop=True)
      6 temp = pred["prediction"] - true["x"]
----> 7 temp.head()
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/dask/dataframe/core.py in head(self, n, npartitions, compute)
   1003             Whether to compute the result, default is True.
   1004         """
-> 1005         return self._head(n=n, npartitions=npartitions, compute=compute, safe=True)
   1006 
   1007     def _head(self, n, npartitions, compute, safe):
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/dask/dataframe/core.py in _head(self, n, npartitions, compute, safe)
   1036 
   1037         if compute:
-> 1038             result = result.compute()
   1039         return result
   1040 
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    442         postcomputes.append(x.__dask_postcompute__())
    443 
--> 444     results = schedule(dsk, keys, **kwargs)
    445     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    446 
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2672                     should_rejoin = False
   2673             try:
-> 2674                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2675             finally:
   2676                 for f in futures.values():
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1972                 direct=direct,
   1973                 local_worker=local_worker,
-> 1974                 asynchronous=asynchronous,
   1975             )
   1976 
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    822         else:
    823             return sync(
--> 824                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    825             )
    826 
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1831                             exc = CancelledError(key)
   1832                         else:
-> 1833                             raise exception.with_traceback(traceback)
   1834                         raise exc
   1835                     if errors == "skip":
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/dask/optimization.py in __call__()
   1020         if not len(args) == len(self.inkeys):
   1021             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
-> 1022         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
   1023 
   1024     def __reduce__(self):
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/pandas/core/ops/__init__.py in wrapper()
   1046 
   1047         with np.errstate(all="ignore"):
-> 1048             result = na_op(lvalues, rvalues)
   1049         return construct_result(
   1050             left, result, index=left.index, name=res_name, dtype=None
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/pandas/core/ops/__init__.py in na_op()
    968             result = expressions.evaluate(op, str_rep, x, y, **eval_kwargs)
    969         except TypeError:
--> 970             result = masked_arith_op(x, y, op)
    971 
    972         return missing.dispatch_fill_zeros(op, x, y, result)
~/anaconda3/envs/rapids-env2/lib/python3.7/site-packages/pandas/core/ops/__init__.py in masked_arith_op()
    448 
    449     else:
--> 450         assert is_scalar(y), type(y)
    451         assert isinstance(x, np.ndarray), type(x)
    452         # mask is only meaningful for x
AssertionError: <class 'cudf.core.series.Series'>

And if I add pred = pred.map_partitions(lambda part: cudf.Series(part)) after xgb.dask.predict then it works fine. If my understanding is right, then it will be good to add this in xgb.dask.predict() itself before returning the result.

@trivialfis
Copy link
Member

I think it's done in #5710 .

@trivialfis
Copy link
Member

Also if you decide to run latest master branch, you can try inplace predict with dask interface. It can yield higher performance and lower memory usage.

@think-high
Copy link
Author

Thanks, that indeed was helpful and increased the performance.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants