diff --git a/python-package/lightgbm/compat.py b/python-package/lightgbm/compat.py index fbd5dd4e3c04..ceb0b290731e 100644 --- a/python-package/lightgbm/compat.py +++ b/python-package/lightgbm/compat.py @@ -114,13 +114,14 @@ def _check_sample_weight(sample_weight, X, dtype=None): from dask.array import Array as dask_Array from dask.dataframe import DataFrame as dask_DataFrame from dask.dataframe import Series as dask_Series - from dask.distributed import Client, default_client, wait + from dask.distributed import Client, default_client, get_worker, wait DASK_INSTALLED = True except ImportError: DASK_INSTALLED = False delayed = None Client = object default_client = None + get_worker = None wait = None class dask_Array: diff --git a/python-package/lightgbm/dask.py b/python-package/lightgbm/dask.py index 60b1f627e6db..fd9110477352 100644 --- a/python-package/lightgbm/dask.py +++ b/python-package/lightgbm/dask.py @@ -9,7 +9,7 @@ import socket from collections import defaultdict from copy import deepcopy -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Type, Union +from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple, Type, Union from urllib.parse import urlparse import numpy as np @@ -17,7 +17,8 @@ from .basic import _LIB, LightGBMError, _choose_param_value, _ConfigAliases, _log_info, _log_warning, _safe_call from .compat import (DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED, Client, LGBMNotFittedError, concat, - dask_Array, dask_DataFrame, dask_Series, default_client, delayed, pd_DataFrame, pd_Series, wait) + dask_Array, dask_DataFrame, dask_Series, default_client, delayed, get_worker, pd_DataFrame, + pd_Series, wait) from .sklearn import LGBMClassifier, LGBMModel, LGBMRanker, LGBMRegressor, _lgbmmodel_doc_fit, _lgbmmodel_doc_predict _DaskCollection = Union[dask_Array, dask_DataFrame, dask_Series] @@ -144,6 +145,7 @@ def _train_part( num_machines: int, return_model: bool, time_out: int = 120, + evals_provided: bool = False, **kwargs: Any ) -> Optional[LGBMModel]: network_params = { @@ -170,12 +172,107 @@ def _train_part( else: group = None + # construct local eval_set data. + local_eval_set = None + local_eval_names = None + local_eval_sample_weight = None + local_eval_group = None + n_evals = max([len(x.get('eval_set', [])) for x in list_of_parts]) + has_eval_weights = any([x.get('eval_sample_weight') is not None for x in list_of_parts]) + if n_evals: + + local_eval_set = [] + if has_eval_weights: + local_eval_sample_weight = [] + if is_ranker: + local_eval_group = [] + + # consolidate parts of each individual eval component. + for i in range(n_evals): + x_e = [] + y_e = [] + w_e = [] + g_e = [] + for part in list_of_parts: + + if not part.get('eval_set'): + continue + + # possible that not each part contains parts of each individual (X, y) eval set. + if i >= len(part['eval_set']): + continue + + eval_set = part['eval_set'][i] + if eval_set == '__train__': + x_e.append(part['data']) + y_e.append(part['label']) + else: + x, y = eval_set + x_e.extend(x) + y_e.extend(y) + + eval_weight = part.get('eval_sample_weight') + if eval_weight: + if eval_weight[i] == '__sample_weight__': + w_e.append(part['weight']) + else: + w_e.extend(eval_weight[i]) + + eval_group = part.get('eval_group') + if eval_group: + if eval_group[i] == '__group__': + g_e.append(part['group']) + else: + g_e.extend(eval_group[i]) + + eval_names = part.get('eval_names') + if eval_names: + if not local_eval_names: + local_eval_names = eval_names + elif len(eval_names) > len(local_eval_names): + local_eval_names = eval_names + + # _concat each eval component. + local_eval_set.append((_concat(x_e), _concat(y_e))) + if w_e: + local_eval_sample_weight.append(_concat(w_e)) + if g_e: + local_eval_group.append(_concat(g_e)) + + else: + # when a worker receives no eval_set while other workers have eval data, causes LightGBMExceptions. + if evals_provided: + local_worker_address = get_worker().address + msg = "eval_set was provided but worker %s was not allocated validation data. Try rebalancing data across workers." + raise RuntimeError(msg % local_worker_address) + try: model = model_factory(**params) if is_ranker: - model.fit(data, label, sample_weight=weight, group=group, **kwargs) + model.fit( + data, + label, + sample_weight=weight, + group=group, + eval_set=local_eval_set, + eval_sample_weight=local_eval_sample_weight, + eval_group=local_eval_group, + eval_names=local_eval_names, + **kwargs + ) else: - model.fit(data, label, sample_weight=weight, **kwargs) + if 'eval_at' in kwargs: + kwargs.pop('eval_at') + + model.fit( + data, + label, + sample_weight=weight, + eval_set=local_eval_set, + eval_sample_weight=local_eval_sample_weight, + eval_names=local_eval_names, + **kwargs + ) finally: _safe_call(_LIB.LGBM_NetworkFree()) @@ -234,6 +331,10 @@ def _train( model_factory: Type[LGBMModel], sample_weight: Optional[_DaskCollection] = None, group: Optional[_DaskCollection] = None, + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]] = None, + eval_names: Optional[List[str]] = None, + eval_sample_weight: Optional[List[_DaskCollection]] = None, + eval_group: Optional[List[_DaskCollection]] = None, **kwargs: Any ) -> LGBMModel: """Inner train routine. @@ -258,6 +359,14 @@ def _train( sum(group) = n_samples. For example, if you have a 100-document dataset with ``group = [10, 20, 40, 10, 10, 10]``, that means that you have 6 groups, where the first 10 records are in the first group, records 11-30 are in the second group, records 31-70 are in the third group, etc. + eval_set : List of (X, y) tuples of Dask data collections, or None, optional (default=None) + List of (X, y) tuple pairs to use as validation sets. + eval_names: List of strings or None, optional (default=None)) + Names of eval_set. + eval_sample_weight: List of Dask data collections or None, optional (default=None) + List of Dask Array or Dask Series, weights for each validation set in eval_set. + eval_group: List of Dask data collections or None, optional (default=None) + List of Dask Array or Dask Series, group/query for each validation set in eval_set. **kwargs Other parameters passed to ``fit`` method of the local underlying model. @@ -354,6 +463,90 @@ def _train( for i in range(n_parts): parts[i]['group'] = group_parts[i] + # evals_set will to be re-constructed into smaller lists of (X, y) tuples, where + # X and y are each delayed sub-lists of original eval dask Collections. + if eval_set: + eval_sets = defaultdict(list) + if eval_sample_weight: + eval_sample_weights = defaultdict(list) + if eval_group: + eval_groups = defaultdict(list) + + for i, (X, y) in enumerate(eval_set): + + # when individual eval set is equivalent to training data, skip recomputing parts. + if X is data and y is label: + for parts_idx in range(n_parts): + eval_sets[parts_idx].append('__train__') + + else: + eval_x_parts = _split_to_parts(data=X, is_matrix=True) + eval_y_parts = _split_to_parts(data=y, is_matrix=False) + + for j in range(len(eval_x_parts)): + parts_idx = j % n_parts + + x_e = eval_x_parts[j] + y_e = eval_y_parts[j] + + if j < n_parts: + eval_sets[parts_idx].append(([x_e], [y_e])) + + else: + eval_sets[parts_idx][-1][0].append(x_e) + eval_sets[parts_idx][-1][1].append(y_e) + + if eval_sample_weight: + if eval_sample_weight[i] is sample_weight: + for parts_idx in range(n_parts): + eval_sample_weights[parts_idx].append('__sample_weight__') + + else: + eval_w_parts = _split_to_parts(data=eval_sample_weight[i], is_matrix=False) + + # ensure that all evaluation parts map uniquely to one part. + for j in range(len(eval_w_parts)): + parts_idx = j % n_parts + + w_e = eval_w_parts[j] + + if j < n_parts: + eval_sample_weights[parts_idx].append([w_e]) + + else: + # n_evals = len(eval_sample_weights[parts_idx]) - 1 + eval_sample_weights[parts_idx][-1].append(w_e) + + if eval_group: + if eval_group[i] is group: + for parts_idx in range(n_parts): + eval_groups[parts_idx].append('__group__') + + else: + eval_g_parts = _split_to_parts(data=eval_group[i], is_matrix=False) + + # ensure that all evaluation parts map uniquely to one part. + for j in range(len(eval_g_parts)): + parts_idx = j % n_parts + g_e = eval_g_parts[j] + + if j < n_parts: + eval_groups[parts_idx].append([g_e]) + + else: + # n_evals = len(eval_groups[parts_idx]) - 1 + eval_groups[parts_idx][-1].append(g_e) + + # assign sub-eval_set components to worker parts. + for parts_idx, e_set in eval_sets.items(): + parts[parts_idx]['eval_set'] = e_set + if eval_names: + parts[parts_idx]['eval_names'] = [eval_names[i] for i in range(len(e_set))] + if eval_sample_weight: + parts[parts_idx]['eval_sample_weight'] = eval_sample_weights[parts_idx] + if eval_group: + parts[parts_idx]['eval_group'] = eval_groups[parts_idx] + # Start computation in the background parts = list(map(delayed, parts)) parts = client.compute(parts) @@ -440,6 +633,7 @@ def _train( num_machines=num_machines, time_out=params.get('time_out', 120), return_model=(worker == master_worker), + evals_provided=eval_set is not None, **kwargs ) for worker, list_of_parts in worker_map.items() @@ -606,15 +800,35 @@ def _lgb_dask_fit( X: _DaskMatrixLike, y: _DaskCollection, sample_weight: Optional[_DaskCollection] = None, + init_score: Optional[List[_DaskCollection]] = None, group: Optional[_DaskCollection] = None, + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]] = None, + eval_names: Optional[List[str]] = None, + eval_sample_weight: Optional[List[_DaskCollection]] = None, + eval_class_weight: Optional[Union[dict, str]] = None, + eval_init_score: Optional[List[_DaskCollection]] = None, + eval_group: Optional[List[_DaskCollection]] = None, + eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, + early_stopping_rounds: Optional[int] = None, **kwargs: Any ) -> "_DaskLGBMModel": if not all((DASK_INSTALLED, PANDAS_INSTALLED, SKLEARN_INSTALLED)): raise LightGBMError('dask, pandas and scikit-learn are required for lightgbm.dask') + not_supported = ['init_score', 'eval_init_score', 'eval_class_weight'] + for ns in not_supported: + if eval(ns) is not None: + raise RuntimeError(f'{ns} is not currently supported in lightgbm.dask') + params = self.get_params(True) params.pop("client", None) + # easier to pass fit args as kwargs than to list_of_parts in _train. + if eval_metric: + kwargs['eval_metric'] = eval_metric + if early_stopping_rounds: + kwargs['early_stopping_rounds'] = early_stopping_rounds + model = _train( client=_get_dask_client(self.client), data=X, @@ -623,6 +837,10 @@ def _lgb_dask_fit( model_factory=model_factory, sample_weight=sample_weight, group=group, + eval_set=eval_set, + eval_names=eval_names, + eval_sample_weight=eval_sample_weight, + eval_group=eval_group, **kwargs ) @@ -723,14 +941,35 @@ def fit( X: _DaskMatrixLike, y: _DaskCollection, sample_weight: Optional[_DaskCollection] = None, + init_score: Optional[_DaskCollection] = None, + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]] = None, + eval_names: Optional[List[str]] = None, + eval_sample_weight: Optional[List[_DaskCollection]] = None, + eval_class_weight: Optional[List[_DaskCollection]] = None, + eval_init_score: Optional[List[_DaskCollection]] = None, + eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, + early_stopping_rounds: Optional[int] = None, **kwargs: Any ) -> "DaskLGBMClassifier": """Docstring is inherited from the lightgbm.LGBMClassifier.fit.""" + not_supported = ['init_score', 'eval_class_weight', 'eval_init_score'] + for ns in not_supported: + if eval(ns) is not None: + raise RuntimeError(f'{ns} is not currently supported in lightgbm.dask') + + if eval_metric: + kwargs['eval_metric'] = eval_metric + if early_stopping_rounds: + kwargs['early_stopping_rounds'] = early_stopping_rounds + return self._lgb_dask_fit( model_factory=LGBMClassifier, X=X, y=y, sample_weight=sample_weight, + eval_set=eval_set, + eval_names=eval_names, + eval_sample_weight=eval_sample_weight, **kwargs ) @@ -738,12 +977,18 @@ def fit( X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]", y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]", sample_weight_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)", - group_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)" + group_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)", + eval_sample_weight_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)', + eval_init_score_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)', + eval_group_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)' ) - # DaskLGBMClassifier does not support init_score, evaluation data, or early stopping + # DaskLGBMClassifier does not support init_score, eval_class_weight, or eval_init_score _base_doc = (_base_doc[:_base_doc.find('init_score :')] - + _base_doc[_base_doc.find('verbose :'):]) + + _base_doc[_base_doc.find('init_score :'):]) + + _base_doc = (_base_doc[:_base_doc.find('eval_class_weight :')] + + _base_doc[_base_doc.find('eval_init_score :'):]) # DaskLGBMClassifier support for callbacks and init_model is not tested fit.__doc__ = ( @@ -874,14 +1119,34 @@ def fit( X: _DaskMatrixLike, y: _DaskCollection, sample_weight: Optional[_DaskCollection] = None, + init_score: Optional[_DaskCollection] = None, + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]] = None, + eval_names: Optional[List[str]] = None, + eval_sample_weight: Optional[List[_DaskCollection]] = None, + eval_init_score: Optional[List[_DaskCollection]] = None, + eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, + early_stopping_rounds: Optional[int] = None, **kwargs: Any ) -> "DaskLGBMRegressor": """Docstring is inherited from the lightgbm.LGBMRegressor.fit.""" + not_supported = ['init_score', 'eval_init_score'] + for ns in not_supported: + if eval(ns) is not None: + raise RuntimeError(f'{ns} is not currently supported in lightgbm.dask') + + if eval_metric: + kwargs['eval_metric'] = eval_metric + if early_stopping_rounds: + kwargs['early_stopping_rounds'] = early_stopping_rounds + return self._lgb_dask_fit( model_factory=LGBMRegressor, X=X, y=y, sample_weight=sample_weight, + eval_set=eval_set, + eval_names=eval_names, + eval_sample_weight=eval_sample_weight, **kwargs ) @@ -889,12 +1154,18 @@ def fit( X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]", y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]", sample_weight_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)", - group_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)" + group_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)", + eval_sample_weight_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)', + eval_init_score_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)', + eval_group_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)' ) - # DaskLGBMRegressor does not support init_score, evaluation data, or early stopping + # DaskLGBMRegressor does not support init_score or eval_init_score _base_doc = (_base_doc[:_base_doc.find('init_score :')] - + _base_doc[_base_doc.find('verbose :'):]) + + _base_doc[_base_doc.find('init_score :'):]) + + _base_doc = (_base_doc[:_base_doc.find('eval_init_weight :')] + + _base_doc[_base_doc.find('eval_init_score :'):]) # DaskLGBMRegressor support for callbacks and init_model is not tested fit.__doc__ = ( @@ -1008,11 +1279,28 @@ def fit( sample_weight: Optional[_DaskCollection] = None, init_score: Optional[_DaskCollection] = None, group: Optional[_DaskCollection] = None, + eval_set: Optional[List[Tuple[_DaskCollection, _DaskCollection]]] = None, + eval_names: Optional[List[str]] = None, + eval_sample_weight: Optional[List[_DaskCollection]] = None, + eval_init_score: Optional[List[_DaskCollection]] = None, + eval_group: Optional[List[_DaskCollection]] = None, + eval_metric: Optional[Union[Callable, str, List[Union[Callable, str]]]] = None, + eval_at: Optional[List[int]] = None, + early_stopping_rounds: Optional[int] = None, **kwargs: Any ) -> "DaskLGBMRanker": """Docstring is inherited from the lightgbm.LGBMRanker.fit.""" - if init_score is not None: - raise RuntimeError('init_score is not currently supported in lightgbm.dask') + not_supported = ['init_score', 'eval_init_score'] + for ns in not_supported: + if eval(ns) is not None: + raise RuntimeError(f'{ns} is not currently supported in lightgbm.dask') + + if eval_metric: + kwargs['eval_metric'] = eval_metric + if eval_at: + kwargs['eval_at'] = eval_at + if early_stopping_rounds: + kwargs['early_stopping_rounds'] = early_stopping_rounds return self._lgb_dask_fit( model_factory=LGBMRanker, @@ -1020,6 +1308,10 @@ def fit( y=y, sample_weight=sample_weight, group=group, + eval_set=eval_set, + eval_names=eval_names, + eval_sample_weight=eval_sample_weight, + eval_group=eval_group, **kwargs ) @@ -1027,15 +1319,18 @@ def fit( X_shape="Dask Array or Dask DataFrame of shape = [n_samples, n_features]", y_shape="Dask Array, Dask DataFrame or Dask Series of shape = [n_samples]", sample_weight_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)", - group_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)" + group_shape="Dask Array, Dask DataFrame, Dask Series of shape = [n_samples] or None, optional (default=None)", + eval_sample_weight_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)', + eval_init_score_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)', + eval_group_shape='List of Dask Arrays, Dask DataFrames, Dask Series or None, optional (default=None)' ) - # DaskLGBMRanker does not support init_score, evaluation data, or early stopping + # DaskLGBMRanker does not support init_score or eval_init_score. _base_doc = (_base_doc[:_base_doc.find('init_score :')] + _base_doc[_base_doc.find('init_score :'):]) - _base_doc = (_base_doc[:_base_doc.find('eval_set :')] - + _base_doc[_base_doc.find('verbose :'):]) + _base_doc = (_base_doc[:_base_doc.find('eval_init_score :')] + + _base_doc[_base_doc.find('eval_init_score :'):]) # DaskLGBMRanker support for callbacks and init_model is not tested fit.__doc__ = ( diff --git a/python-package/lightgbm/sklearn.py b/python-package/lightgbm/sklearn.py index d6b882114c6a..2c50fa2f4599 100644 --- a/python-package/lightgbm/sklearn.py +++ b/python-package/lightgbm/sklearn.py @@ -201,13 +201,13 @@ def __call__(self, preds, dataset): A list of (X, y) tuple pairs to use as validation sets. eval_names : list of strings or None, optional (default=None) Names of eval_set. - eval_sample_weight : list of arrays or None, optional (default=None) + eval_sample_weight : {eval_sample_weight_shape} Weights of eval data. eval_class_weight : list or None, optional (default=None) Class weights of eval data. - eval_init_score : list of arrays or None, optional (default=None) + eval_init_score : {eval_init_score_shape} Init score of eval data. - eval_group : list of arrays or None, optional (default=None) + eval_group : {eval_group_shape} Group data of eval data. eval_metric : string, callable, list or None, optional (default=None) If string, it should be a built-in evaluation metric to use. @@ -706,7 +706,10 @@ def _get_meta_data(collection, name, i): X_shape="array-like or sparse matrix of shape = [n_samples, n_features]", y_shape="array-like of shape = [n_samples]", sample_weight_shape="array-like of shape = [n_samples] or None, optional (default=None)", - group_shape="array-like or None, optional (default=None)" + group_shape="array-like or None, optional (default=None)", + eval_sample_weight_shape="list of arrays or None, optional (default=None)", + eval_init_score_shape="list of arrays or None, optional (default=None)", + eval_group_shape="list of arrays or None, optional (default=None)" ) + "\n\n" + _lgbmmodel_doc_custom_eval_note def predict(self, X, raw_score=False, start_iteration=0, num_iteration=None, diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 3aca753e6bb6..e7258b81e53e 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -59,10 +59,13 @@ def listen_port(): listen_port.port = 13000 -def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs): +def _create_ranking_data(n_samples=100, output='array', chunk_size=50, random_weights=True, **kwargs): X, y, g = make_ranking(n_samples=n_samples, random_state=42, **kwargs) rnd = np.random.RandomState(42) - w = rnd.rand(X.shape[0]) * 0.01 + w = rnd.random(X.shape[0]) * 0.01 + if not random_weights: + w = np.ones([X.shape[0]]) + g_rle = np.array([len(list(grp)) for _, grp in groupby(g)]) if output.startswith('dataframe'): @@ -115,15 +118,20 @@ def _create_ranking_data(n_samples=100, output='array', chunk_size=50, **kwargs) return X, y, w, g_rle, dX, dy, dw, dg -def _create_data(objective, n_samples=100, centers=2, output='array', chunk_size=50): +def _create_data(objective, n_samples=100, n_features=None, centers=2, output='array', chunk_size=50, random_weights=True): if objective == 'classification': - X, y = make_blobs(n_samples=n_samples, centers=centers, random_state=42) + n_features = n_features if n_features else 2 + X, y = make_blobs(n_samples=n_samples, n_features=n_features, centers=centers, random_state=42) elif objective == 'regression': - X, y = make_regression(n_samples=n_samples, random_state=42) + n_features = n_features if n_features else 100 + X, y = make_regression(n_samples=n_samples, n_features=n_features, random_state=42) else: raise ValueError("Unknown objective '%s'" % objective) rnd = np.random.RandomState(42) + weights = rnd.random(X.shape[0]) * 0.01 + if not random_weights: + weights = np.ones([X.shape[0]]) if output == 'array': dX = da.from_array(X, (chunk_size, X.shape[1])) @@ -623,6 +631,313 @@ def test_ranker(output, client, group): client.close(timeout=CLIENT_CLOSE_TIMEOUT) +@pytest.mark.parametrize('task', tasks) +@pytest.mark.parametrize('eval_sizes', [[0.9], [0.5, 1], [0]]) +@pytest.mark.parametrize('eval_names_prefix', ['specified', None]) +def test_eval_set_with_early_stopping(task, eval_sizes, eval_names_prefix, client): + + # use larger number of samples to prevent faux early stopping whereby + # boosting stops on accident because each worker has few data points and achieves 0 loss. + n_samples = 1000 + n_features = 10 + n_eval_sets = len(eval_sizes) + early_stopping_rounds = 1 + eval_set = [] + eval_sample_weight = [] + + if eval_names_prefix: + eval_names = [eval_names_prefix + f'_{i}' for i in range(len(eval_sizes))] + else: + eval_names = None + + if task == 'ranking': + # Do not use random sample weights for eval set, as this will + # prevent random validation set from being useful for early stopping. + # Use fewer features to eliminate chance of terminating too much early by having fit to noise. + X, y, w, g, dX, dy, dw, dg = _create_ranking_data( + n_samples=n_samples, + n_features=n_features, + output='dataframe', + chunk_size=10, + random_gs=True, + random_weights=False + ) + model_factory = lgb.DaskLGBMRanker + eval_metrics = ['ndcg'] + eval_at = [5, 10] + eval_group = [] + + # create eval* datasets for ranking task. + for eval_size in eval_sizes: + if eval_size == 1: + dX_e = dX + dy_e = dy + dw_e = dw + dg_e = dg + else: + _, _, _, _, dX_e, dy_e, dw_e, dg_e = _create_ranking_data( + n_samples=max(10, int(n_samples * eval_size)), + n_features=n_features, + output='dataframe', + chunk_size=10, + random_gs=True, + random_weights=False + ) + + eval_set.append((dX_e, dy_e)) + eval_sample_weight.append(dw_e) + eval_group.append(dg_e) + + else: + X, y, w, dX, dy, dw = _create_data( + n_samples=n_samples, + n_features=n_features, + objective=task, + output='array', + chunk_size=10, + random_weights=False + ) + dg = None + eval_at = None + eval_group = None + if task == 'classification': + model_factory = lgb.DaskLGBMClassifier + eval_metrics = ['binary_error', 'auc'] + elif task == 'regression': + model_factory = lgb.DaskLGBMRegressor + eval_metrics = ['rmse'] + + for eval_size in eval_sizes: + if eval_size == 1: + dX_e = dX + dy_e = dy + dw_e = dw + else: + _, _, _, dX_e, dy_e, dw_e = _create_data( + n_samples=max(10, int(n_samples * eval_size)), + n_features=n_features, + objective=task, + output='array', + chunk_size=10, + random_weights=False + ) + + eval_set.append((dX_e, dy_e)) + eval_sample_weight.append(dw_e) + + full_trees = 200 + params = { + "random_state": 42, + "n_estimators": full_trees, + "num_leaves": 31, + "first_metric_only": True + } + + dask_model = model_factory( + client=client, + **params + ) + + # when eval_size is exactly 1 partition, not enough eval_set data to reach each worker. + if eval_sizes == [0]: + with pytest.raises(RuntimeError, match='eval_set was provided but worker .* was not allocated validation data'): + dask_model.fit( + dX, + dy, + group=dg, + eval_set=eval_set, + eval_names=eval_names, + eval_sample_weight=eval_sample_weight, + eval_group=eval_group, + eval_metric=eval_metrics, + early_stopping_rounds=early_stopping_rounds, + eval_at=eval_at + ) + + else: + dask_model = dask_model.fit( + dX, + dy, + group=dg, + eval_set=eval_set, + eval_names=eval_names, + eval_sample_weight=eval_sample_weight, + eval_group=eval_group, + eval_metric=eval_metrics, + early_stopping_rounds=early_stopping_rounds, + eval_at=eval_at, + verbose=True + ) + fitted_trees = dask_model.booster_.num_trees() + assert fitted_trees < full_trees + assert dask_model.best_iteration_ < full_trees + + # be sure that model still produces decent output. + p1 = dask_model.predict(dX) + if task == 'classification': + p1_acc = _accuracy_score(dy, p1) + msg = f'binary accuracy score of predictions with actuals was <= 0.8 ({p1_acc})' + assert p1_acc > 0.8, msg + + elif task == 'regression': + p1_r2 = _r2_score(dy, p1) + msg = f'r2 score of predictions with actuals was <= 0.8 ({p1_r2})' + assert p1_r2 > 0.8, msg + + else: + p1_cor = spearmanr(p1.compute(), y).correlation + msg = f'spearman correlation of predictions with actuals was < 0.8 ({p1_cor})' + assert p1_cor > 0.8, msg + + # check that evals_result contains expected eval_set names when provided. + n_rounds_tried = dask_model.best_iteration_ + early_stopping_rounds + evals_result = dask_model.evals_result_ + assert len(evals_result) == n_eval_sets + evals_result_names = list(evals_result.keys()) + if eval_names: + assert all(x in eval_names for x in evals_result_names) + + # check that evals_result names default to "training" or "valid_xx" without eval_names. + for evals_result_name in evals_result_names: + if not eval_names: + assert evals_result_name.startswith('training') or evals_result_name.startswith('valid') + + # check that eval_metric(s) are contained in evals_result dicts. + for i, metric in enumerate(eval_metrics): + if task == 'ranking': + metric += f'@{eval_at[i]}' + + assert metric in evals_result[evals_result_name] + + # len of each eval_metric should be number of fitted trees + early_stopping_rounds. + assert len(evals_result[evals_result_name][metric]) == n_rounds_tried + + # stopping decision should have been made based on the best score of the first of eval_metrics. + if i == 0: + best_score = dask_model.best_score_[evals_result_name][metric] + best_iter_zero_indexed = dask_model.best_iteration_ - 1 + + # distinguish between is_higher_better metrics. + if metric in ['ndcg']: + assert_eq(best_score, max(evals_result[evals_result_name][metric]), atol=0.03) + assert abs(best_iter_zero_indexed - np.argmax(evals_result[evals_result_name][metric])) \ + <= early_stopping_rounds + + else: + assert_eq(best_score, min(evals_result[evals_result_name][metric]), atol=0.03) + assert abs(best_iter_zero_indexed - np.argmin(evals_result[evals_result_name][metric])) \ + <= early_stopping_rounds + + client.close(timeout=CLIENT_CLOSE_TIMEOUT) + + +@pytest.mark.parametrize('task', tasks) +@pytest.mark.parametrize('eval_names_prefix', ['specified', None]) +def test_eval_set_without_early_stopping(task, eval_names_prefix, client): + + n_samples = 1000 + n_eval_samples = 500 + eval_set = [] + + if task == 'ranking': + _, _, _, _, dX, dy, _, dg = _create_ranking_data( + n_samples=n_samples, + output='dataframe', + chunk_size=10, + random_gs=True + ) + model_factory = lgb.DaskLGBMRanker + eval_metrics = ['ndcg', 'map'] + eval_at = [5, 10] + eval_group = [] + + # create eval data. + _, _, _, _, dX_e, dy_e, _, dg_e = _create_ranking_data( + n_samples=n_eval_samples, + output='dataframe', + chunk_size=10, + random_gs=True + ) + eval_set.append((dX_e, dy_e)) + eval_group.append(dg_e) + + else: + _, _, _, dX, dy, _ = _create_data( + n_samples=n_samples, + objective=task, + output='array', + chunk_size=10 + ) + dg = None + eval_at = None + eval_group = None + if task == 'classification': + model_factory = lgb.DaskLGBMClassifier + eval_metrics = ['binary_error', 'auc'] + elif task == 'regression': + model_factory = lgb.DaskLGBMRegressor + eval_metrics = ['l2', 'l1'] + + _, _, _, dX_e, dy_e, _ = _create_data( + n_samples=n_eval_samples, + objective=task, + output='array', + chunk_size=10 + ) + eval_set.append((dX_e, dy_e)) + + if eval_names_prefix: + eval_names = [eval_names_prefix + f'_{i}' for i in range(len(eval_set))] + else: + eval_names = None + + full_trees = 100 + params = { + "n_estimators": full_trees, + "num_leaves": 5 + } + + dask_model = model_factory( + client=client, + **params + ) + + dask_model = dask_model.fit( + dX, + dy, + group=dg, + eval_set=eval_set, + eval_names=eval_names, + eval_group=eval_group, + eval_metric=eval_metrics, + early_stopping_rounds=None, + eval_at=eval_at, + verbose=True + ) + + # check that early stopping was not applied. + fitted_trees = dask_model.booster_.num_trees() + assert fitted_trees == full_trees + assert dask_model.best_iteration_ is None + + # check that evals_result_ contains expected data. + evals_result = dask_model.evals_result_ + evals_result_name = list(evals_result.keys())[0] + if eval_names: + assert evals_result_name == eval_names[0] + else: + assert evals_result_name == 'valid_0' + + for i, metric in enumerate(eval_metrics): + if task == 'ranking': + metric += f'@{eval_at[i]}' + + assert metric in evals_result[evals_result_name] + assert len(evals_result[evals_result_name][metric]) == full_trees + + client.close(timeout=CLIENT_CLOSE_TIMEOUT) + + @pytest.mark.parametrize('task', tasks) def test_training_works_if_client_not_provided_or_set_after_construction(task, client): if task == 'ranking':