Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python-package] [dask] Add DaskLGBMRanker #3708

Merged
merged 20 commits into from
Jan 22, 2021
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 61 additions & 17 deletions python-package/lightgbm/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from dask.distributed import Client, default_client, get_worker, wait

from .basic import _LIB, _safe_call
from .sklearn import LGBMClassifier, LGBMRegressor
from .sklearn import LGBMClassifier, LGBMRegressor, LGBMRanker

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -132,15 +132,24 @@ def _train_part(params, model_factory, list_of_parts, worker_address_to_port, re
}
params.update(network_params)

is_ranker = model_factory.__qualname__ == 'LGBMRanker'
ffineis marked this conversation as resolved.
Show resolved Hide resolved

# Concatenate many parts into one
parts = tuple(zip(*list_of_parts))
data = _concat(parts[0])
label = _concat(parts[1])
weight = _concat(parts[2]) if len(parts) == 3 else None

try:
model = model_factory(**params)
model.fit(data, label, sample_weight=weight, **kwargs)

if is_ranker:
group = _concat(parts[-1])
weight = _concat(parts[2]) if len(parts) == 4 else None
model.fit(data, y=label, sample_weight=weight, group=group, **kwargs)
else:
weight = _concat(parts[2]) if len(parts) == 3 else None
model.fit(data, y=label, sample_weight=weight, **kwargs)

finally:
_safe_call(_LIB.LGBM_NetworkFree())

Expand All @@ -155,7 +164,7 @@ def _split_to_parts(data, is_matrix):
return parts


def _train(client, data, label, params, model_factory, weight=None, **kwargs):
def _train(client, data, label, params, model_factory, sample_weight=None, group=None, **kwargs):
"""Inner train routine.

Parameters
Expand All @@ -166,20 +175,32 @@ def _train(client, data, label, params, model_factory, weight=None, **kwargs):
y : dask array of shape = [n_samples]
The target values (class labels in classification, real numbers in regression).
params : dict
model_factory : lightgbm.LGBMClassifier or lightgbm.LGBMRegressor class
model_factory : lightgbm.LGBMClassifier, lightgbm.LGBMRegressor, or lightgbm.LGBMRanker class
sample_weight : array-like of shape = [n_samples] or None, optional (default=None)
Weights of training data.
Weights of training data.
group : array-like where sum(group) = [n_samples] or None for non-ranking objectives (default=None)
Group/query data, only used for ranking task. sum(group) = n_samples. For example,
if you have a 100-record dataset with `group = [10, 20, 40, 10, 10]`, that means that you have
5 groups, where the first 10 records are in the first group, records 11-30 are the second group, etc.
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
"""
# Split arrays/dataframes into parts. Arrange parts into tuples to enforce co-locality
# Split arrays/dataframes into parts. Arrange parts into dicts to enforce co-locality
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
data_parts = _split_to_parts(data, is_matrix=True)
label_parts = _split_to_parts(label, is_matrix=False)
if weight is None:
parts = list(map(delayed, zip(data_parts, label_parts)))
weight_parts = _split_to_parts(sample_weight, is_matrix=False) if sample_weight is not None else None
group_parts = _split_to_parts(group, is_matrix=False) if group is not None else None

# choose between four options of (sample_weight, group) being (un)specified
if weight_parts is None and group_parts is None:
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
parts = zip(data_parts, label_parts)
elif weight_parts is not None and group_parts is None:
parts = zip(data_parts, label_parts, weight_parts)
elif weight_parts is None and group_parts is not None:
parts = zip(data_parts, label_parts, group_parts)
else:
weight_parts = _split_to_parts(weight, is_matrix=False)
parts = list(map(delayed, zip(data_parts, label_parts, weight_parts)))
parts = zip(data_parts, label_parts, weight_parts, group_parts)

# Start computation in the background
parts = list(map(delayed, parts))
jameslamb marked this conversation as resolved.
Show resolved Hide resolved
parts = client.compute(parts)
wait(parts)

Expand Down Expand Up @@ -252,7 +273,7 @@ def _predict(model, data, proba=False, dtype=np.float32, **kwargs):

Parameters
----------
model :
model : local lightgbm.LGBM[Classifier/Regressor/Ranker]
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved
data : dask array of shape = [n_samples, n_features]
Input feature matrix.
proba : bool
Expand All @@ -275,13 +296,13 @@ def _predict(model, data, proba=False, dtype=np.float32, **kwargs):

class _LGBMModel:

def _fit(self, model_factory, X, y=None, sample_weight=None, client=None, **kwargs):
def _fit(self, model_factory, X, y=None, sample_weight=None, group=None, client=None, **kwargs):
"""Docstring is inherited from the LGBMModel."""
if client is None:
client = default_client()

params = self.get_params(True)
model = _train(client, X, y, params, model_factory, sample_weight, **kwargs)
model = _train(client, X, y, params, model_factory, sample_weight, group, **kwargs)
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved

self.set_params(**model.get_params())
self._copy_extra_params(model, self)
Expand All @@ -306,8 +327,8 @@ class DaskLGBMClassifier(_LGBMModel, LGBMClassifier):
"""Distributed version of lightgbm.LGBMClassifier."""

def fit(self, X, y=None, sample_weight=None, client=None, **kwargs):
"""Docstring is inherited from the LGBMModel."""
return self._fit(LGBMClassifier, X, y, sample_weight, client, **kwargs)
"""Docstring is inherited from the lightgbm.LGBMClassifier.fit."""
return self._fit(LGBMClassifier, X=X, y=y, sample_weight=sample_weight, client=client, **kwargs)
fit.__doc__ = LGBMClassifier.fit.__doc__

def predict(self, X, **kwargs):
Expand Down Expand Up @@ -335,7 +356,7 @@ class DaskLGBMRegressor(_LGBMModel, LGBMRegressor):

def fit(self, X, y=None, sample_weight=None, client=None, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMRegressor.fit."""
return self._fit(LGBMRegressor, X, y, sample_weight, client, **kwargs)
return self._fit(LGBMRegressor, X=X, y=y, sample_weight=sample_weight, client=client, **kwargs)
fit.__doc__ = LGBMRegressor.fit.__doc__

def predict(self, X, **kwargs):
Expand All @@ -351,3 +372,26 @@ def to_local(self):
model : lightgbm.LGBMRegressor
"""
return self._to_local(LGBMRegressor)


class DaskLGBMRanker(_LGBMModel, LGBMRanker):
"""Docstring is inherited from the lightgbm.LGBMRanker."""

def fit(self, X, y=None, sample_weight=None, group=None, client=None, **kwargs):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jameslamb What about init_score? Is it supported or we should add feature request for it?

init_score : array-like of shape = [n_samples] or None, optional (default=None)
Init score of training data.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a feature request. I'll write it up and add a link here.

@ffineis, could you add init_score=None here between sample_weight and group, so the order matches the sklearn interface for LGBMRanker? (

sample_weight=None, init_score=None, group=None,
). That way, if people have existing sklearn code with positional arguments to fit(), they won't accidentally have their init_score interpreted as group.

And can you just then add a check like this?

if init_score is not None:
    raise RuntimeError("init_score is not currently supported in lightgbm.dask")

@StrikerRUS speaking of positional arguments, I'll open another issue where we can discuss how to handle the client argument. But let's leave that out of this PR.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jameslamb
Yes, sure! Agree with all your intents.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

init_score: #3807

client placement: #3808

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, no prob

"""Docstring is inherited from the lightgbm.LGBMRanker.fit."""
return self._fit(LGBMRanker, X=X, y=y, sample_weight=sample_weight, group=group, client=client, **kwargs)
fit.__doc__ = LGBMRanker.fit.__doc__

def predict(self, X, **kwargs):
"""Docstring is inherited from the lightgbm.LGBMRanker.predict."""
return _predict(self.to_local(), X, **kwargs)
predict.__doc__ = LGBMRanker.predict.__doc__

def to_local(self):
"""Create regular version of lightgbm.LGBMRanker from the distributed version.

Returns
-------
model : lightgbm.LGBMRanker
"""
return self._to_local(LGBMRanker)
Loading