Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use DMatrix Proxy for implementing data callback. #5629

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Jenkinsfile-win64
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pipeline {
steps {
script {
parallel ([
'build-win64-cuda9.0': { BuildWin64() }
'build-win64-cuda10.0': { BuildWin64() }
])
}
milestone ordinal: 2
Expand All @@ -40,7 +40,6 @@ pipeline {
script {
parallel ([
'test-win64-cpu': { TestWin64CPU() },
'test-win64-gpu-cuda9.0': { TestWin64GPU(cuda_target: 'cuda9') },
'test-win64-gpu-cuda10.0': { TestWin64GPU(cuda_target: 'cuda10_0') },
'test-win64-gpu-cuda10.1': { TestWin64GPU(cuda_target: 'cuda10_1') }
])
Expand Down
49 changes: 46 additions & 3 deletions demo/dask/gpu_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from xgboost.dask import DaskDMatrix


def main(client):
def with_dask_dmatrix(client):
# generate some random data for demonstration
m = 100000
n = 100
Expand All @@ -15,6 +15,48 @@ def main(client):
# DaskDMatrix acts like normal DMatrix, works as a proxy for local
# DMatrix scatter around workers.
dtrain = DaskDMatrix(client, X, y)
# dtrain = xgb.dask.DaskDeviceQuantileDMatrix(client, X, y)

# Use train method from xgboost.dask instead of xgboost. This
# distributed version of train returns a dictionary containing the
# resulting booster and evaluation history obtained from
# evaluation metrics.
output = xgb.dask.train(client,
{'verbosity': 2,
# Golden line for GPU training
'tree_method': 'gpu_hist'},
dtrain,
num_boost_round=4, evals=[(dtrain, 'train')])
bst = output['booster']
history = output['history']

# you can pass output directly into `predict` too.
prediction = xgb.dask.predict(client, bst, dtrain)
prediction = prediction.compute()
print('Evaluation history:', history)
return prediction


def with_dask_device_dmatrix():
try:
import cupy
except ImportError:
print('Skipping demo `with_dask_device_dmatrix`.')
return None

# generate some random data for demonstration
m = 100000
n = 100
X = da.random.random(size=(m, n), chunks=100)
y = da.random.random(size=(m, ), chunks=100)
# Map generated data into GPU for demonstration purposes.
X = X.map_blocks(cupy.array)
y = y.map_blocks(cupy.array)

# Compared to DaskDMatrix, DaskDeviceQuantileDMatrix comsumes much
# lesser GPU memory, but only works for gpu hist and accepts only
# GPU input data, while DaskDMatrix works for any tree method.
dtrain = xgb.dask.DaskDeviceQuantileDMatrix(client, X, y)

# Use train method from xgboost.dask instead of xgboost. This
# distributed version of train returns a dictionary containing the
Expand All @@ -40,6 +82,7 @@ def main(client):
# `LocalCUDACluster` is used for assigning GPU to XGBoost processes. Here
# `n_workers` represents the number of GPUs since we use one GPU per worker
# process.
with LocalCUDACluster(n_workers=2, threads_per_worker=4) as cluster:
with LocalCUDACluster(threads_per_worker=4) as cluster:
with Client(cluster) as client:
main(client)
with_dask_dmatrix(client)
with_dask_device_dmatrix(client)
98 changes: 98 additions & 0 deletions demo/guide-python/data_iterator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
'''A demo for defining data iterator.

The demo that defines a customized iterator for passing batches of data into
`xgboost.DMatrix` and use this `DMatrix` for training.

Aftering going through the demo, one might ask why don't we use more native
Python iterator? That's because XGBoost require a `reset` function, while
using `itertools.tee` might incur significant memory usage according to:

https://docs.python.org/3/library/itertools.html#itertools.tee.

'''

import xgboost
import cupy
import numpy

COLS = 64
ROWS_PER_BATCH = 1000 # data is splited by rows
BATCHES = 32


class IterForDMatixDemo(xgboost.core.DataIter):
'''A data iterator for XGBoost DMatrix.

`reset` and `next` are required for any data iterator, other functions here
are utilites for demonstration's purpose.

'''
def __init__(self):
'''Generate some random data for demostration.

Actual data can be anything that is currently supported by XGBoost.
'''
self.rows = ROWS_PER_BATCH
self.cols = COLS
rng = cupy.random.RandomState(1994)
self._data = [rng.randn(self.rows, self.cols)] * BATCHES
self._labels = [rng.randn(self.rows)] * BATCHES

self.it = 0 # set iterator to 0
super().__init__()

def as_array(self):
return cupy.concatenate(self._data)

def as_array_labels(self):
return cupy.concatenate(self._labels)

def data(self):
'''Utility function for obtaining current batch of data.'''
return self._data[self.it]

def labels(self):
'''Utility function for obtaining current batch of label.'''
return self._labels[self.it]

def reset(self):
'''Reset the iterator'''
self.it = 0

def next(self, input_data):
'''Yield next batch of data'''
if self.it == len(self._data):
# Return 0 when there's no more batch.
return 0
input_data(data=self.data(), label=self.labels())
self.it += 1
return 1


def main():
rounds = 100
it = IterForDMatixDemo()

# Use iterator
m_with_it = xgboost.DMatrix(it)

m = xgboost.DeviceQuantileDMatrix(
it.as_array(), it.as_array_labels())

assert m_with_it.num_col() == m.num_col()
assert m_with_it.num_row() == m.num_row()

reg_with_it = xgboost.train({'tree_method': 'gpu_hist'}, m_with_it,
num_boost_round=rounds)
predict_with_it = reg_with_it.predict(m_with_it)

reg = xgboost.train({'tree_method': 'gpu_hist'}, m,
num_boost_round=rounds)
predict = reg.predict(m)

numpy.testing.assert_allclose(predict_with_it, predict,
rtol=1e6)


if __name__ == '__main__':
main()
Loading