From e09dbf32f9d714720be95ecbc24d9b4fa5043b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20MZ?= Date: Fri, 2 Apr 2021 23:52:13 -0600 Subject: [PATCH 1/2] replace client fixture with cluster fixture --- tests/python_package_test/test_dask.py | 1545 ++++++++++++------------ 1 file changed, 770 insertions(+), 775 deletions(-) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 8d459aac4f14..5d502db1a67b 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -28,7 +28,6 @@ import sklearn.utils.estimator_checks as sklearn_checks from dask.array.utils import assert_eq from dask.distributed import Client, LocalCluster, default_client, wait -from distributed.utils_test import client, cluster_fixture, gen_cluster, loop from pkg_resources import parse_version from scipy.sparse import csr_matrix from scipy.stats import spearmanr @@ -39,10 +38,6 @@ sk_version = parse_version(sk_version) -# time, in seconds, to wait for the Dask client to close. Used to avoid teardown errors -# see https://distributed.dask.org/en/latest/api.html#distributed.Client.close -CLIENT_CLOSE_TIMEOUT = 120 - tasks = ['binary-classification', 'multiclass-classification', 'regression', 'ranking'] distributed_training_algorithms = ['data', 'voting'] data_output = ['array', 'scipy_csr_matrix', 'dataframe', 'dataframe-with-categorical'] @@ -68,6 +63,20 @@ ] +@pytest.fixture(scope='module') +def cluster(): + dask_cluster = LocalCluster(n_workers=2, threads_per_worker=2, dashboard_address=None) + yield dask_cluster + dask_cluster.close() + + +@pytest.fixture(scope='module') +def cluster2(): + dask_cluster = LocalCluster(n_workers=2, threads_per_worker=2, dashboard_address=None) + yield dask_cluster + dask_cluster.close() + + @pytest.fixture() def listen_port(): listen_port.port += 10 @@ -237,556 +246,548 @@ def _unpickle(filepath, serializer): @pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification']) @pytest.mark.parametrize('boosting_type', boosting_types) @pytest.mark.parametrize('tree_learner', distributed_training_algorithms) -def test_classifier(output, task, boosting_type, tree_learner, client): - X, y, w, _, dX, dy, dw, _ = _create_data( - objective=task, - output=output - ) - - params = { - "boosting_type": boosting_type, - "tree_learner": tree_learner, - "n_estimators": 50, - "num_leaves": 31 - } - if boosting_type == 'rf': - params.update({ - 'bagging_freq': 1, - 'bagging_fraction': 0.9, - }) - elif boosting_type == 'goss': - params['top_rate'] = 0.5 - - dask_classifier = lgb.DaskLGBMClassifier( - client=client, - time_out=5, - **params - ) - dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw) - p1 = dask_classifier.predict(dX) - p1_proba = dask_classifier.predict_proba(dX).compute() - p1_pred_leaf = dask_classifier.predict(dX, pred_leaf=True) - p1_local = dask_classifier.to_local().predict(X) - s1 = _accuracy_score(dy, p1) - p1 = p1.compute() - - local_classifier = lgb.LGBMClassifier(**params) - local_classifier.fit(X, y, sample_weight=w) - p2 = local_classifier.predict(X) - p2_proba = local_classifier.predict_proba(X) - s2 = local_classifier.score(X, y) - - if boosting_type == 'rf': - # https://github.com/microsoft/LightGBM/issues/4118 - assert_eq(s1, s2, atol=0.01) - assert_eq(p1_proba, p2_proba, atol=0.8) - else: - assert_eq(s1, s2) - assert_eq(p1, p2) - assert_eq(p1, y) - assert_eq(p2, y) - assert_eq(p1_proba, p2_proba, atol=0.03) - assert_eq(p1_local, p2) - assert_eq(p1_local, y) - - # pref_leaf values should have the right shape - # and values that look like valid tree nodes - pred_leaf_vals = p1_pred_leaf.compute() - assert pred_leaf_vals.shape == ( - X.shape[0], - dask_classifier.booster_.num_trees() - ) - assert np.max(pred_leaf_vals) <= params['num_leaves'] - assert np.min(pred_leaf_vals) >= 0 - assert len(np.unique(pred_leaf_vals)) <= params['num_leaves'] - - # be sure LightGBM actually used at least one categorical column, - # and that it was correctly treated as a categorical feature - if output == 'dataframe-with-categorical': - cat_cols = [ - col for col in dX.columns - if dX.dtypes[col].name == 'category' - ] - tree_df = dask_classifier.booster_.trees_to_dataframe() - node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) - assert node_uses_cat_col.sum() > 0 - assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_classifier(output, task, boosting_type, tree_learner, cluster): + with Client(cluster) as client: + X, y, w, _, dX, dy, dw, _ = _create_data( + objective=task, + output=output + ) + + params = { + "boosting_type": boosting_type, + "tree_learner": tree_learner, + "n_estimators": 50, + "num_leaves": 31 + } + if boosting_type == 'rf': + params.update({ + 'bagging_freq': 1, + 'bagging_fraction': 0.9, + }) + elif boosting_type == 'goss': + params['top_rate'] = 0.5 + + dask_classifier = lgb.DaskLGBMClassifier( + client=client, + time_out=5, + **params + ) + dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw) + p1 = dask_classifier.predict(dX) + p1_proba = dask_classifier.predict_proba(dX).compute() + p1_pred_leaf = dask_classifier.predict(dX, pred_leaf=True) + p1_local = dask_classifier.to_local().predict(X) + s1 = _accuracy_score(dy, p1) + p1 = p1.compute() + + local_classifier = lgb.LGBMClassifier(**params) + local_classifier.fit(X, y, sample_weight=w) + p2 = local_classifier.predict(X) + p2_proba = local_classifier.predict_proba(X) + s2 = local_classifier.score(X, y) + + if boosting_type == 'rf': + # https://github.com/microsoft/LightGBM/issues/4118 + assert_eq(s1, s2, atol=0.01) + assert_eq(p1_proba, p2_proba, atol=0.8) + else: + assert_eq(s1, s2) + assert_eq(p1, p2) + assert_eq(p1, y) + assert_eq(p2, y) + assert_eq(p1_proba, p2_proba, atol=0.03) + assert_eq(p1_local, p2) + assert_eq(p1_local, y) + + # pref_leaf values should have the right shape + # and values that look like valid tree nodes + pred_leaf_vals = p1_pred_leaf.compute() + assert pred_leaf_vals.shape == ( + X.shape[0], + dask_classifier.booster_.num_trees() + ) + assert np.max(pred_leaf_vals) <= params['num_leaves'] + assert np.min(pred_leaf_vals) >= 0 + assert len(np.unique(pred_leaf_vals)) <= params['num_leaves'] + + # be sure LightGBM actually used at least one categorical column, + # and that it was correctly treated as a categorical feature + if output == 'dataframe-with-categorical': + cat_cols = [ + col for col in dX.columns + if dX.dtypes[col].name == 'category' + ] + tree_df = dask_classifier.booster_.trees_to_dataframe() + node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) + assert node_uses_cat_col.sum() > 0 + assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('task', ['binary-classification', 'multiclass-classification']) -def test_classifier_pred_contrib(output, task, client): - X, y, w, _, dX, dy, dw, _ = _create_data( - objective=task, - output=output - ) - - params = { - "n_estimators": 10, - "num_leaves": 10 - } - - dask_classifier = lgb.DaskLGBMClassifier( - client=client, - time_out=5, - tree_learner='data', - **params - ) - dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw) - preds_with_contrib = dask_classifier.predict(dX, pred_contrib=True).compute() - - local_classifier = lgb.LGBMClassifier(**params) - local_classifier.fit(X, y, sample_weight=w) - local_preds_with_contrib = local_classifier.predict(X, pred_contrib=True) - - if output == 'scipy_csr_matrix': - preds_with_contrib = np.array(preds_with_contrib.todense()) - - # be sure LightGBM actually used at least one categorical column, - # and that it was correctly treated as a categorical feature - if output == 'dataframe-with-categorical': - cat_cols = [ - col for col in dX.columns - if dX.dtypes[col].name == 'category' - ] - tree_df = dask_classifier.booster_.trees_to_dataframe() - node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) - assert node_uses_cat_col.sum() > 0 - assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' - - # shape depends on whether it is binary or multiclass classification - num_features = dask_classifier.n_features_ - num_classes = dask_classifier.n_classes_ - if num_classes == 2: - expected_num_cols = num_features + 1 - else: - expected_num_cols = (num_features + 1) * num_classes - - # * shape depends on whether it is binary or multiclass classification - # * matrix for binary classification is of the form [feature_contrib, base_value], - # for multi-class it's [feat_contrib_class1, base_value_class1, feat_contrib_class2, base_value_class2, etc.] - # * contrib outputs for distributed training are different than from local training, so we can just test - # that the output has the right shape and base values are in the right position - assert preds_with_contrib.shape[1] == expected_num_cols - assert preds_with_contrib.shape == local_preds_with_contrib.shape - - if num_classes == 2: - assert len(np.unique(preds_with_contrib[:, num_features]) == 1) - else: - for i in range(num_classes): - base_value_col = num_features * (i + 1) + i - assert len(np.unique(preds_with_contrib[:, base_value_col]) == 1) - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_classifier_pred_contrib(output, task, cluster): + with Client(cluster) as client: + X, y, w, _, dX, dy, dw, _ = _create_data( + objective=task, + output=output + ) + params = { + "n_estimators": 10, + "num_leaves": 10 + } -def test_find_random_open_port(client): - for _ in range(5): - worker_address_to_port = client.run(lgb.dask._find_random_open_port) - found_ports = worker_address_to_port.values() - # check that found ports are different for same address (LocalCluster) - assert len(set(found_ports)) == len(found_ports) - # check that the ports are indeed open - for port in found_ports: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('', port)) - client.close(timeout=CLIENT_CLOSE_TIMEOUT) - - -def test_possibly_fix_worker_map(capsys, client): - client.wait_for_workers(2) - worker_addresses = list(client.scheduler_info()["workers"].keys()) - - retry_msg = 'Searching for a LightGBM training port for worker' - - # should handle worker maps without any duplicates - map_without_duplicates = { - worker_address: 12400 + i - for i, worker_address in enumerate(worker_addresses) - } - patched_map = lgb.dask._possibly_fix_worker_map_duplicates( - client=client, - worker_map=map_without_duplicates - ) - assert patched_map == map_without_duplicates - assert retry_msg not in capsys.readouterr().out - - # should handle worker maps with duplicates - map_with_duplicates = { - worker_address: 12400 - for i, worker_address in enumerate(worker_addresses) - } - patched_map = lgb.dask._possibly_fix_worker_map_duplicates( - client=client, - worker_map=map_with_duplicates - ) - assert retry_msg in capsys.readouterr().out - assert len(set(patched_map.values())) == len(worker_addresses) - - -def test_training_does_not_fail_on_port_conflicts(client): - _, _, _, _, dX, dy, dw, _ = _create_data('binary-classification', output='array') - - lightgbm_default_port = 12400 - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.bind(('127.0.0.1', lightgbm_default_port)) dask_classifier = lgb.DaskLGBMClassifier( client=client, time_out=5, - n_estimators=5, - num_leaves=5 + tree_learner='data', + **params ) + dask_classifier = dask_classifier.fit(dX, dy, sample_weight=dw) + preds_with_contrib = dask_classifier.predict(dX, pred_contrib=True).compute() + + local_classifier = lgb.LGBMClassifier(**params) + local_classifier.fit(X, y, sample_weight=w) + local_preds_with_contrib = local_classifier.predict(X, pred_contrib=True) + + if output == 'scipy_csr_matrix': + preds_with_contrib = np.array(preds_with_contrib.todense()) + + # be sure LightGBM actually used at least one categorical column, + # and that it was correctly treated as a categorical feature + if output == 'dataframe-with-categorical': + cat_cols = [ + col for col in dX.columns + if dX.dtypes[col].name == 'category' + ] + tree_df = dask_classifier.booster_.trees_to_dataframe() + node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) + assert node_uses_cat_col.sum() > 0 + assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' + + # shape depends on whether it is binary or multiclass classification + num_features = dask_classifier.n_features_ + num_classes = dask_classifier.n_classes_ + if num_classes == 2: + expected_num_cols = num_features + 1 + else: + expected_num_cols = (num_features + 1) * num_classes + + # * shape depends on whether it is binary or multiclass classification + # * matrix for binary classification is of the form [feature_contrib, base_value], + # for multi-class it's [feat_contrib_class1, base_value_class1, feat_contrib_class2, base_value_class2, etc.] + # * contrib outputs for distributed training are different than from local training, so we can just test + # that the output has the right shape and base values are in the right position + assert preds_with_contrib.shape[1] == expected_num_cols + assert preds_with_contrib.shape == local_preds_with_contrib.shape + + if num_classes == 2: + assert len(np.unique(preds_with_contrib[:, num_features]) == 1) + else: + for i in range(num_classes): + base_value_col = num_features * (i + 1) + i + assert len(np.unique(preds_with_contrib[:, base_value_col]) == 1) + + +def test_find_random_open_port(cluster): + with Client(cluster) as client: for _ in range(5): - dask_classifier.fit( - X=dX, - y=dy, - sample_weight=dw, + worker_address_to_port = client.run(lgb.dask._find_random_open_port) + found_ports = worker_address_to_port.values() + # check that found ports are different for same address (LocalCluster) + assert len(set(found_ports)) == len(found_ports) + # check that the ports are indeed open + for port in found_ports: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('', port)) + + +def test_possibly_fix_worker_map(capsys, cluster): + with Client(cluster) as client: + worker_addresses = list(client.scheduler_info()["workers"].keys()) + + retry_msg = 'Searching for a LightGBM training port for worker' + + # should handle worker maps without any duplicates + map_without_duplicates = { + worker_address: 12400 + i + for i, worker_address in enumerate(worker_addresses) + } + patched_map = lgb.dask._possibly_fix_worker_map_duplicates( + client=client, + worker_map=map_without_duplicates + ) + assert patched_map == map_without_duplicates + assert retry_msg not in capsys.readouterr().out + + # should handle worker maps with duplicates + map_with_duplicates = { + worker_address: 12400 + for i, worker_address in enumerate(worker_addresses) + } + patched_map = lgb.dask._possibly_fix_worker_map_duplicates( + client=client, + worker_map=map_with_duplicates + ) + assert retry_msg in capsys.readouterr().out + assert len(set(patched_map.values())) == len(worker_addresses) + + +def test_training_does_not_fail_on_port_conflicts(cluster): + with Client(cluster) as client: + _, _, _, _, dX, dy, dw, _ = _create_data('binary-classification', output='array') + + lightgbm_default_port = 12400 + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(('127.0.0.1', lightgbm_default_port)) + dask_classifier = lgb.DaskLGBMClassifier( + client=client, + time_out=5, + n_estimators=5, + num_leaves=5 ) - assert dask_classifier.booster_ - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) + for _ in range(5): + dask_classifier.fit( + X=dX, + y=dy, + sample_weight=dw, + ) + assert dask_classifier.booster_ @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('boosting_type', boosting_types) @pytest.mark.parametrize('tree_learner', distributed_training_algorithms) -def test_regressor(output, boosting_type, tree_learner, client): - X, y, w, _, dX, dy, dw, _ = _create_data( - objective='regression', - output=output - ) - - params = { - "boosting_type": boosting_type, - "random_state": 42, - "num_leaves": 31, - "n_estimators": 20, - } - if boosting_type == 'rf': - params.update({ - 'bagging_freq': 1, - 'bagging_fraction': 0.9, - }) - - dask_regressor = lgb.DaskLGBMRegressor( - client=client, - time_out=5, - tree=tree_learner, - **params - ) - dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw) - p1 = dask_regressor.predict(dX) - p1_pred_leaf = dask_regressor.predict(dX, pred_leaf=True) - - s1 = _r2_score(dy, p1) - p1 = p1.compute() - p1_local = dask_regressor.to_local().predict(X) - s1_local = dask_regressor.to_local().score(X, y) - - local_regressor = lgb.LGBMRegressor(**params) - local_regressor.fit(X, y, sample_weight=w) - s2 = local_regressor.score(X, y) - p2 = local_regressor.predict(X) - - # Scores should be the same - assert_eq(s1, s2, atol=0.01) - assert_eq(s1, s1_local) - - # Predictions should be roughly the same. - assert_eq(p1, p1_local) - - # pref_leaf values should have the right shape - # and values that look like valid tree nodes - pred_leaf_vals = p1_pred_leaf.compute() - assert pred_leaf_vals.shape == ( - X.shape[0], - dask_regressor.booster_.num_trees() - ) - assert np.max(pred_leaf_vals) <= params['num_leaves'] - assert np.min(pred_leaf_vals) >= 0 - assert len(np.unique(pred_leaf_vals)) <= params['num_leaves'] - - assert_eq(p1, y, rtol=0.5, atol=50.) - assert_eq(p2, y, rtol=0.5, atol=50.) - - # be sure LightGBM actually used at least one categorical column, - # and that it was correctly treated as a categorical feature - if output == 'dataframe-with-categorical': - cat_cols = [ - col for col in dX.columns - if dX.dtypes[col].name == 'category' - ] - tree_df = dask_regressor.booster_.trees_to_dataframe() - node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) - assert node_uses_cat_col.sum() > 0 - assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_regressor(output, boosting_type, tree_learner, cluster): + with Client(cluster) as client: + X, y, w, _, dX, dy, dw, _ = _create_data( + objective='regression', + output=output + ) + + params = { + "boosting_type": boosting_type, + "random_state": 42, + "num_leaves": 31, + "n_estimators": 20, + } + if boosting_type == 'rf': + params.update({ + 'bagging_freq': 1, + 'bagging_fraction': 0.9, + }) + + dask_regressor = lgb.DaskLGBMRegressor( + client=client, + time_out=5, + tree=tree_learner, + **params + ) + dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw) + p1 = dask_regressor.predict(dX) + p1_pred_leaf = dask_regressor.predict(dX, pred_leaf=True) + + s1 = _r2_score(dy, p1) + p1 = p1.compute() + p1_local = dask_regressor.to_local().predict(X) + s1_local = dask_regressor.to_local().score(X, y) + + local_regressor = lgb.LGBMRegressor(**params) + local_regressor.fit(X, y, sample_weight=w) + s2 = local_regressor.score(X, y) + p2 = local_regressor.predict(X) + + # Scores should be the same + assert_eq(s1, s2, atol=0.01) + assert_eq(s1, s1_local) + + # Predictions should be roughly the same. + assert_eq(p1, p1_local) + + # pref_leaf values should have the right shape + # and values that look like valid tree nodes + pred_leaf_vals = p1_pred_leaf.compute() + assert pred_leaf_vals.shape == ( + X.shape[0], + dask_regressor.booster_.num_trees() + ) + assert np.max(pred_leaf_vals) <= params['num_leaves'] + assert np.min(pred_leaf_vals) >= 0 + assert len(np.unique(pred_leaf_vals)) <= params['num_leaves'] + + assert_eq(p1, y, rtol=0.5, atol=50.) + assert_eq(p2, y, rtol=0.5, atol=50.) + + # be sure LightGBM actually used at least one categorical column, + # and that it was correctly treated as a categorical feature + if output == 'dataframe-with-categorical': + cat_cols = [ + col for col in dX.columns + if dX.dtypes[col].name == 'category' + ] + tree_df = dask_regressor.booster_.trees_to_dataframe() + node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) + assert node_uses_cat_col.sum() > 0 + assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' @pytest.mark.parametrize('output', data_output) -def test_regressor_pred_contrib(output, client): - X, y, w, _, dX, dy, dw, _ = _create_data( - objective='regression', - output=output - ) - - params = { - "n_estimators": 10, - "num_leaves": 10 - } - - dask_regressor = lgb.DaskLGBMRegressor( - client=client, - time_out=5, - tree_learner='data', - **params - ) - dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw) - preds_with_contrib = dask_regressor.predict(dX, pred_contrib=True).compute() - - local_regressor = lgb.LGBMRegressor(**params) - local_regressor.fit(X, y, sample_weight=w) - local_preds_with_contrib = local_regressor.predict(X, pred_contrib=True) - - if output == "scipy_csr_matrix": - preds_with_contrib = np.array(preds_with_contrib.todense()) - - # contrib outputs for distributed training are different than from local training, so we can just test - # that the output has the right shape and base values are in the right position - num_features = dX.shape[1] - assert preds_with_contrib.shape[1] == num_features + 1 - assert preds_with_contrib.shape == local_preds_with_contrib.shape - - # be sure LightGBM actually used at least one categorical column, - # and that it was correctly treated as a categorical feature - if output == 'dataframe-with-categorical': - cat_cols = [ - col for col in dX.columns - if dX.dtypes[col].name == 'category' - ] - tree_df = dask_regressor.booster_.trees_to_dataframe() - node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) - assert node_uses_cat_col.sum() > 0 - assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_regressor_pred_contrib(output, cluster): + with Client(cluster) as client: + X, y, w, _, dX, dy, dw, _ = _create_data( + objective='regression', + output=output + ) + + params = { + "n_estimators": 10, + "num_leaves": 10 + } + + dask_regressor = lgb.DaskLGBMRegressor( + client=client, + time_out=5, + tree_learner='data', + **params + ) + dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw) + preds_with_contrib = dask_regressor.predict(dX, pred_contrib=True).compute() + + local_regressor = lgb.LGBMRegressor(**params) + local_regressor.fit(X, y, sample_weight=w) + local_preds_with_contrib = local_regressor.predict(X, pred_contrib=True) + + if output == "scipy_csr_matrix": + preds_with_contrib = np.array(preds_with_contrib.todense()) + + # contrib outputs for distributed training are different than from local training, so we can just test + # that the output has the right shape and base values are in the right position + num_features = dX.shape[1] + assert preds_with_contrib.shape[1] == num_features + 1 + assert preds_with_contrib.shape == local_preds_with_contrib.shape + + # be sure LightGBM actually used at least one categorical column, + # and that it was correctly treated as a categorical feature + if output == 'dataframe-with-categorical': + cat_cols = [ + col for col in dX.columns + if dX.dtypes[col].name == 'category' + ] + tree_df = dask_regressor.booster_.trees_to_dataframe() + node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) + assert node_uses_cat_col.sum() > 0 + assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' @pytest.mark.parametrize('output', data_output) @pytest.mark.parametrize('alpha', [.1, .5, .9]) -def test_regressor_quantile(output, client, alpha): - X, y, w, _, dX, dy, dw, _ = _create_data( - objective='regression', - output=output - ) - - params = { - "objective": "quantile", - "alpha": alpha, - "random_state": 42, - "n_estimators": 10, - "num_leaves": 10 - } - - dask_regressor = lgb.DaskLGBMRegressor( - client=client, - tree_learner_type='data_parallel', - **params - ) - dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw) - p1 = dask_regressor.predict(dX).compute() - q1 = np.count_nonzero(y < p1) / y.shape[0] - - local_regressor = lgb.LGBMRegressor(**params) - local_regressor.fit(X, y, sample_weight=w) - p2 = local_regressor.predict(X) - q2 = np.count_nonzero(y < p2) / y.shape[0] - - # Quantiles should be right - np.testing.assert_allclose(q1, alpha, atol=0.2) - np.testing.assert_allclose(q2, alpha, atol=0.2) - - # be sure LightGBM actually used at least one categorical column, - # and that it was correctly treated as a categorical feature - if output == 'dataframe-with-categorical': - cat_cols = [ - col for col in dX.columns - if dX.dtypes[col].name == 'category' - ] - tree_df = dask_regressor.booster_.trees_to_dataframe() - node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) - assert node_uses_cat_col.sum() > 0 - assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_regressor_quantile(output, alpha, cluster): + with Client(cluster) as client: + X, y, w, _, dX, dy, dw, _ = _create_data( + objective='regression', + output=output + ) + + params = { + "objective": "quantile", + "alpha": alpha, + "random_state": 42, + "n_estimators": 10, + "num_leaves": 10 + } + + dask_regressor = lgb.DaskLGBMRegressor( + client=client, + tree_learner_type='data_parallel', + **params + ) + dask_regressor = dask_regressor.fit(dX, dy, sample_weight=dw) + p1 = dask_regressor.predict(dX).compute() + q1 = np.count_nonzero(y < p1) / y.shape[0] + + local_regressor = lgb.LGBMRegressor(**params) + local_regressor.fit(X, y, sample_weight=w) + p2 = local_regressor.predict(X) + q2 = np.count_nonzero(y < p2) / y.shape[0] + + # Quantiles should be right + np.testing.assert_allclose(q1, alpha, atol=0.2) + np.testing.assert_allclose(q2, alpha, atol=0.2) + + # be sure LightGBM actually used at least one categorical column, + # and that it was correctly treated as a categorical feature + if output == 'dataframe-with-categorical': + cat_cols = [ + col for col in dX.columns + if dX.dtypes[col].name == 'category' + ] + tree_df = dask_regressor.booster_.trees_to_dataframe() + node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) + assert node_uses_cat_col.sum() > 0 + assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' @pytest.mark.parametrize('output', ['array', 'dataframe', 'dataframe-with-categorical']) @pytest.mark.parametrize('group', [None, group_sizes]) @pytest.mark.parametrize('boosting_type', boosting_types) @pytest.mark.parametrize('tree_learner', distributed_training_algorithms) -def test_ranker(output, group, boosting_type, tree_learner, client): - if output == 'dataframe-with-categorical': - X, y, w, g, dX, dy, dw, dg = _create_data( - objective='ranking', - output=output, - group=group, - n_features=1, - n_informative=1 +def test_ranker(output, group, boosting_type, tree_learner, cluster): + with Client(cluster) as client: + if output == 'dataframe-with-categorical': + X, y, w, g, dX, dy, dw, dg = _create_data( + objective='ranking', + output=output, + group=group, + n_features=1, + n_informative=1 + ) + else: + X, y, w, g, dX, dy, dw, dg = _create_data( + objective='ranking', + output=output, + group=group + ) + + # rebalance small dask.Array dataset for better performance. + if output == 'array': + dX = dX.persist() + dy = dy.persist() + dw = dw.persist() + dg = dg.persist() + _ = wait([dX, dy, dw, dg]) + client.rebalance() + + # use many trees + leaves to overfit, help ensure that Dask data-parallel strategy matches that of + # serial learner. See https://github.com/microsoft/LightGBM/issues/3292#issuecomment-671288210. + params = { + "boosting_type": boosting_type, + "random_state": 42, + "n_estimators": 50, + "num_leaves": 20, + "min_child_samples": 1 + } + if boosting_type == 'rf': + params.update({ + 'bagging_freq': 1, + 'bagging_fraction': 0.9, + }) + + dask_ranker = lgb.DaskLGBMRanker( + client=client, + time_out=5, + tree_learner_type=tree_learner, + **params ) - else: - X, y, w, g, dX, dy, dw, dg = _create_data( - objective='ranking', - output=output, - group=group + dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg) + rnkvec_dask = dask_ranker.predict(dX) + rnkvec_dask = rnkvec_dask.compute() + p1_pred_leaf = dask_ranker.predict(dX, pred_leaf=True) + rnkvec_dask_local = dask_ranker.to_local().predict(X) + + local_ranker = lgb.LGBMRanker(**params) + local_ranker.fit(X, y, sample_weight=w, group=g) + rnkvec_local = local_ranker.predict(X) + + # distributed ranker should be able to rank decently well and should + # have high rank correlation with scores from serial ranker. + dcor = spearmanr(rnkvec_dask, y).correlation + assert dcor > 0.6 + assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.8 + assert_eq(rnkvec_dask, rnkvec_dask_local) + + # pref_leaf values should have the right shape + # and values that look like valid tree nodes + pred_leaf_vals = p1_pred_leaf.compute() + assert pred_leaf_vals.shape == ( + X.shape[0], + dask_ranker.booster_.num_trees() ) + assert np.max(pred_leaf_vals) <= params['num_leaves'] + assert np.min(pred_leaf_vals) >= 0 + assert len(np.unique(pred_leaf_vals)) <= params['num_leaves'] - # rebalance small dask.Array dataset for better performance. - if output == 'array': - dX = dX.persist() - dy = dy.persist() - dw = dw.persist() - dg = dg.persist() - _ = wait([dX, dy, dw, dg]) - client.rebalance() - - # use many trees + leaves to overfit, help ensure that Dask data-parallel strategy matches that of - # serial learner. See https://github.com/microsoft/LightGBM/issues/3292#issuecomment-671288210. - params = { - "boosting_type": boosting_type, - "random_state": 42, - "n_estimators": 50, - "num_leaves": 20, - "min_child_samples": 1 - } - if boosting_type == 'rf': - params.update({ - 'bagging_freq': 1, - 'bagging_fraction': 0.9, - }) - - dask_ranker = lgb.DaskLGBMRanker( - client=client, - time_out=5, - tree_learner_type=tree_learner, - **params - ) - dask_ranker = dask_ranker.fit(dX, dy, sample_weight=dw, group=dg) - rnkvec_dask = dask_ranker.predict(dX) - rnkvec_dask = rnkvec_dask.compute() - p1_pred_leaf = dask_ranker.predict(dX, pred_leaf=True) - rnkvec_dask_local = dask_ranker.to_local().predict(X) - - local_ranker = lgb.LGBMRanker(**params) - local_ranker.fit(X, y, sample_weight=w, group=g) - rnkvec_local = local_ranker.predict(X) - - # distributed ranker should be able to rank decently well and should - # have high rank correlation with scores from serial ranker. - dcor = spearmanr(rnkvec_dask, y).correlation - assert dcor > 0.6 - assert spearmanr(rnkvec_dask, rnkvec_local).correlation > 0.8 - assert_eq(rnkvec_dask, rnkvec_dask_local) - - # pref_leaf values should have the right shape - # and values that look like valid tree nodes - pred_leaf_vals = p1_pred_leaf.compute() - assert pred_leaf_vals.shape == ( - X.shape[0], - dask_ranker.booster_.num_trees() - ) - assert np.max(pred_leaf_vals) <= params['num_leaves'] - assert np.min(pred_leaf_vals) >= 0 - assert len(np.unique(pred_leaf_vals)) <= params['num_leaves'] - - # be sure LightGBM actually used at least one categorical column, - # and that it was correctly treated as a categorical feature - if output == 'dataframe-with-categorical': - cat_cols = [ - col for col in dX.columns - if dX.dtypes[col].name == 'category' - ] - tree_df = dask_ranker.booster_.trees_to_dataframe() - node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) - assert node_uses_cat_col.sum() > 0 - assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) + # be sure LightGBM actually used at least one categorical column, + # and that it was correctly treated as a categorical feature + if output == 'dataframe-with-categorical': + cat_cols = [ + col for col in dX.columns + if dX.dtypes[col].name == 'category' + ] + tree_df = dask_ranker.booster_.trees_to_dataframe() + node_uses_cat_col = tree_df['split_feature'].isin(cat_cols) + assert node_uses_cat_col.sum() > 0 + assert tree_df.loc[node_uses_cat_col, "decision_type"].unique()[0] == '==' @pytest.mark.parametrize('task', tasks) -def test_training_works_if_client_not_provided_or_set_after_construction(task, client): - _, _, _, _, dX, dy, _, dg = _create_data( - objective=task, - output='array', - group=None - ) - model_factory = task_to_dask_factory[task] - - params = { - "time_out": 5, - "n_estimators": 1, - "num_leaves": 2 - } - - # should be able to use the class without specifying a client - dask_model = model_factory(**params) - assert dask_model.client is None - with pytest.raises(lgb.compat.LGBMNotFittedError, match='Cannot access property client_ before calling fit'): - dask_model.client_ - - dask_model.fit(dX, dy, group=dg) - assert dask_model.fitted_ - assert dask_model.client is None - assert dask_model.client_ == client - - preds = dask_model.predict(dX) - assert isinstance(preds, da.Array) - assert dask_model.fitted_ - assert dask_model.client is None - assert dask_model.client_ == client - - local_model = dask_model.to_local() - with pytest.raises(AttributeError): - local_model.client - local_model.client_ - - # should be able to set client after construction - dask_model = model_factory(**params) - dask_model.set_params(client=client) - assert dask_model.client == client - - with pytest.raises(lgb.compat.LGBMNotFittedError, match='Cannot access property client_ before calling fit'): - dask_model.client_ - - dask_model.fit(dX, dy, group=dg) - assert dask_model.fitted_ - assert dask_model.client == client - assert dask_model.client_ == client - - preds = dask_model.predict(dX) - assert isinstance(preds, da.Array) - assert dask_model.fitted_ - assert dask_model.client == client - assert dask_model.client_ == client - - local_model = dask_model.to_local() - with pytest.raises(AttributeError): - local_model.client - local_model.client_ - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_training_works_if_client_not_provided_or_set_after_construction(task, cluster): + with Client(cluster) as client: + _, _, _, _, dX, dy, _, dg = _create_data( + objective=task, + output='array', + group=None + ) + model_factory = task_to_dask_factory[task] + + params = { + "time_out": 5, + "n_estimators": 1, + "num_leaves": 2 + } + + # should be able to use the class without specifying a client + dask_model = model_factory(**params) + assert dask_model.client is None + with pytest.raises(lgb.compat.LGBMNotFittedError, match='Cannot access property client_ before calling fit'): + dask_model.client_ + + dask_model.fit(dX, dy, group=dg) + assert dask_model.fitted_ + assert dask_model.client is None + assert dask_model.client_ == client + + preds = dask_model.predict(dX) + assert isinstance(preds, da.Array) + assert dask_model.fitted_ + assert dask_model.client is None + assert dask_model.client_ == client + + local_model = dask_model.to_local() + with pytest.raises(AttributeError): + local_model.client + local_model.client_ + + # should be able to set client after construction + dask_model = model_factory(**params) + dask_model.set_params(client=client) + assert dask_model.client == client + + with pytest.raises(lgb.compat.LGBMNotFittedError, match='Cannot access property client_ before calling fit'): + dask_model.client_ + + dask_model.fit(dX, dy, group=dg) + assert dask_model.fitted_ + assert dask_model.client == client + assert dask_model.client_ == client + + preds = dask_model.predict(dX) + assert isinstance(preds, da.Array) + assert dask_model.fitted_ + assert dask_model.client == client + assert dask_model.client_ == client + + local_model = dask_model.to_local() + with pytest.raises(AttributeError): + local_model.client + local_model.client_ @pytest.mark.parametrize('serializer', ['pickle', 'joblib', 'cloudpickle']) @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('set_client', [True, False]) -def test_model_and_local_version_are_picklable_whether_or_not_client_set_explicitly(serializer, task, set_client, tmp_path): +def test_model_and_local_version_are_picklable_whether_or_not_client_set_explicitly(serializer, task, set_client, tmp_path, cluster, cluster2): - with LocalCluster(n_workers=2, threads_per_worker=1) as cluster1, Client(cluster1) as client1: + with Client(cluster) as client1: # data on cluster1 X_1, _, _, _, dX_1, dy_1, _, dg_1 = _create_data( objective=task, @@ -794,7 +795,7 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici group=None ) - with LocalCluster(n_workers=2, threads_per_worker=1) as cluster2, Client(cluster2) as client2: + with Client(cluster2) as client2: # create identical data on cluster2 X_2, _, _, _, dX_2, dy_2, _, dg_2 = _create_data( objective=task, @@ -948,193 +949,190 @@ def test_model_and_local_version_are_picklable_whether_or_not_client_set_explici assert_eq(preds_orig_local, preds_loaded_model_local) -def test_warns_and_continues_on_unrecognized_tree_learner(client): - X = da.random.random((1e3, 10)) - y = da.random.random((1e3, 1)) - dask_regressor = lgb.DaskLGBMRegressor( - client=client, - time_out=5, - tree_learner='some-nonsense-value', - n_estimators=1, - num_leaves=2 - ) - with pytest.warns(UserWarning, match='Parameter tree_learner set to some-nonsense-value'): - dask_regressor = dask_regressor.fit(X, y) - - assert dask_regressor.fitted_ +def test_warns_and_continues_on_unrecognized_tree_learner(cluster): + with Client(cluster) as client: + X = da.random.random((1e3, 10)) + y = da.random.random((1e3, 1)) + dask_regressor = lgb.DaskLGBMRegressor( + client=client, + time_out=5, + tree_learner='some-nonsense-value', + n_estimators=1, + num_leaves=2 + ) + with pytest.warns(UserWarning, match='Parameter tree_learner set to some-nonsense-value'): + dask_regressor = dask_regressor.fit(X, y) - client.close(timeout=CLIENT_CLOSE_TIMEOUT) + assert dask_regressor.fitted_ @pytest.mark.parametrize('tree_learner', ['data_parallel', 'voting_parallel']) -def test_training_respects_tree_learner_aliases(tree_learner, client): - task = 'regression' - _, _, _, _, dX, dy, dw, dg = _create_data(objective=task, output='array') - dask_factory = task_to_dask_factory[task] - dask_model = dask_factory( - client=client, - tree_learner=tree_learner, - time_out=5, - n_estimators=10, - num_leaves=15 - ) - dask_model.fit(dX, dy, sample_weight=dw, group=dg) - - assert dask_model.fitted_ - assert dask_model.get_params()['tree_learner'] == tree_learner - - -def test_error_on_feature_parallel_tree_learner(client): - X = da.random.random((100, 10), chunks=(50, 10)) - y = da.random.random(100, chunks=50) - dask_regressor = lgb.DaskLGBMRegressor( - client=client, - time_out=5, - tree_learner='feature_parallel', - n_estimators=1, - num_leaves=2 - ) - with pytest.raises(lgb.basic.LightGBMError, match='Do not support feature parallel in c api'): - dask_regressor = dask_regressor.fit(X, y) - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) - - -@gen_cluster(client=True, timeout=None) -def test_errors(c, s, a, b): - def f(part): - raise Exception('foo') - - df = dd.demo.make_timeseries() - df = df.map_partitions(f, meta=df._meta) - with pytest.raises(Exception) as info: - yield lgb.dask._train( - client=c, - data=df, - label=df.x, - params={}, - model_factory=lgb.LGBMClassifier +def test_training_respects_tree_learner_aliases(tree_learner, cluster): + with Client(cluster) as client: + task = 'regression' + _, _, _, _, dX, dy, dw, dg = _create_data(objective=task, output='array') + dask_factory = task_to_dask_factory[task] + dask_model = dask_factory( + client=client, + tree_learner=tree_learner, + time_out=5, + n_estimators=10, + num_leaves=15 ) - assert 'foo' in str(info.value) + dask_model.fit(dX, dy, sample_weight=dw, group=dg) + + assert dask_model.fitted_ + assert dask_model.get_params()['tree_learner'] == tree_learner + + +def test_error_on_feature_parallel_tree_learner(cluster): + with Client(cluster) as client: + X = da.random.random((100, 10), chunks=(50, 10)) + y = da.random.random(100, chunks=50) + X, y = client.persist([X, y]) + client.rebalance() + dask_regressor = lgb.DaskLGBMRegressor( + client=client, + time_out=5, + tree_learner='feature_parallel', + n_estimators=1, + num_leaves=2 + ) + with pytest.raises(lgb.basic.LightGBMError, match='Do not support feature parallel in c api'): + dask_regressor = dask_regressor.fit(X, y) + + +def test_errors(cluster): + with Client(cluster) as client: + def f(part): + raise Exception('foo') + + df = dd.demo.make_timeseries() + df = df.map_partitions(f, meta=df._meta) + with pytest.raises(Exception) as info: + lgb.dask._train( + client=client, + data=df, + label=df.x, + params={}, + model_factory=lgb.LGBMClassifier + ) + assert 'foo' in str(info.value) @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('output', data_output) -def test_training_succeeds_even_if_some_workers_do_not_have_any_data(client, task, output): +def test_training_succeeds_even_if_some_workers_do_not_have_any_data(task, output, cluster): if task == 'ranking' and output == 'scipy_csr_matrix': pytest.skip('LGBMRanker is not currently tested on sparse matrices') - def collection_to_single_partition(collection): - """Merge the parts of a Dask collection into a single partition.""" - if collection is None: - return - if isinstance(collection, da.Array): - return collection.rechunk(*collection.shape) - return collection.repartition(npartitions=1) - - X, y, w, g, dX, dy, dw, dg = _create_data( - objective=task, - output=output, - group=None - ) - - dask_model_factory = task_to_dask_factory[task] - local_model_factory = task_to_local_factory[task] - - dX = collection_to_single_partition(dX) - dy = collection_to_single_partition(dy) - dw = collection_to_single_partition(dw) - dg = collection_to_single_partition(dg) - - n_workers = len(client.scheduler_info()['workers']) - assert n_workers > 1 - assert dX.npartitions == 1 - - params = { - 'time_out': 5, - 'random_state': 42, - 'num_leaves': 10 - } - - dask_model = dask_model_factory(tree='data', client=client, **params) - dask_model.fit(dX, dy, group=dg, sample_weight=dw) - dask_preds = dask_model.predict(dX).compute() - - local_model = local_model_factory(**params) - if task == 'ranking': - local_model.fit(X, y, group=g, sample_weight=w) - else: - local_model.fit(X, y, sample_weight=w) - local_preds = local_model.predict(X) + with Client(cluster) as client: + def collection_to_single_partition(collection): + """Merge the parts of a Dask collection into a single partition.""" + if collection is None: + return + if isinstance(collection, da.Array): + return collection.rechunk(*collection.shape) + return collection.repartition(npartitions=1) - assert assert_eq(dask_preds, local_preds) + X, y, w, g, dX, dy, dw, dg = _create_data( + objective=task, + output=output, + group=None + ) + + dask_model_factory = task_to_dask_factory[task] + local_model_factory = task_to_local_factory[task] + + dX = collection_to_single_partition(dX) + dy = collection_to_single_partition(dy) + dw = collection_to_single_partition(dw) + dg = collection_to_single_partition(dg) + + n_workers = len(client.scheduler_info()['workers']) + assert n_workers > 1 + assert dX.npartitions == 1 + + params = { + 'time_out': 5, + 'random_state': 42, + 'num_leaves': 10 + } + + dask_model = dask_model_factory(tree='data', client=client, **params) + dask_model.fit(dX, dy, group=dg, sample_weight=dw) + dask_preds = dask_model.predict(dX).compute() - client.close(timeout=CLIENT_CLOSE_TIMEOUT) + local_model = local_model_factory(**params) + if task == 'ranking': + local_model.fit(X, y, group=g, sample_weight=w) + else: + local_model.fit(X, y, sample_weight=w) + local_preds = local_model.predict(X) + + assert assert_eq(dask_preds, local_preds) @pytest.mark.parametrize('task', tasks) -def test_network_params_not_required_but_respected_if_given(client, task, listen_port): - client.wait_for_workers(2) - - _, _, _, _, dX, dy, _, dg = _create_data( - objective=task, - output='array', - chunk_size=10, - group=None - ) - - dask_model_factory = task_to_dask_factory[task] - - # rebalance data to be sure that each worker has a piece of the data - client.rebalance() - - # model 1 - no network parameters given - dask_model1 = dask_model_factory( - n_estimators=5, - num_leaves=5, - ) - dask_model1.fit(dX, dy, group=dg) - assert dask_model1.fitted_ - params = dask_model1.get_params() - assert 'local_listen_port' not in params - assert 'machines' not in params - - # model 2 - machines given - n_workers = len(client.scheduler_info()['workers']) - open_ports = [lgb.dask._find_random_open_port() for _ in range(n_workers)] - dask_model2 = dask_model_factory( - n_estimators=5, - num_leaves=5, - machines=",".join([ - "127.0.0.1:" + str(port) - for port in open_ports - ]), - ) - - dask_model2.fit(dX, dy, group=dg) - assert dask_model2.fitted_ - params = dask_model2.get_params() - assert 'local_listen_port' not in params - assert 'machines' in params - - # model 3 - local_listen_port given - # training should fail because LightGBM will try to use the same - # port for multiple worker processes on the same machine - dask_model3 = dask_model_factory( - n_estimators=5, - num_leaves=5, - local_listen_port=listen_port - ) - error_msg = "has multiple Dask worker processes running on it" - with pytest.raises(lgb.basic.LightGBMError, match=error_msg): - dask_model3.fit(dX, dy, group=dg) - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_network_params_not_required_but_respected_if_given(task, listen_port, cluster): + with Client(cluster) as client: + _, _, _, _, dX, dy, _, dg = _create_data( + objective=task, + output='array', + chunk_size=10, + group=None + ) + + dask_model_factory = task_to_dask_factory[task] + + # rebalance data to be sure that each worker has a piece of the data + client.rebalance() + + # model 1 - no network parameters given + dask_model1 = dask_model_factory( + n_estimators=5, + num_leaves=5, + ) + dask_model1.fit(dX, dy, group=dg) + assert dask_model1.fitted_ + params = dask_model1.get_params() + assert 'local_listen_port' not in params + assert 'machines' not in params + + # model 2 - machines given + n_workers = len(client.scheduler_info()['workers']) + open_ports = [lgb.dask._find_random_open_port() for _ in range(n_workers)] + dask_model2 = dask_model_factory( + n_estimators=5, + num_leaves=5, + machines=",".join([ + "127.0.0.1:" + str(port) + for port in open_ports + ]), + ) + + dask_model2.fit(dX, dy, group=dg) + assert dask_model2.fitted_ + params = dask_model2.get_params() + assert 'local_listen_port' not in params + assert 'machines' in params + + # model 3 - local_listen_port given + # training should fail because LightGBM will try to use the same + # port for multiple worker processes on the same machine + dask_model3 = dask_model_factory( + n_estimators=5, + num_leaves=5, + local_listen_port=listen_port + ) + error_msg = "has multiple Dask worker processes running on it" + with pytest.raises(lgb.basic.LightGBMError, match=error_msg): + dask_model3.fit(dX, dy, group=dg) @pytest.mark.parametrize('task', tasks) -def test_machines_should_be_used_if_provided(task): - with LocalCluster(n_workers=2) as cluster, Client(cluster) as client: +def test_machines_should_be_used_if_provided(task, cluster): + with Client(cluster) as client: _, _, _, _, dX, dy, _, dg = _create_data( objective=task, output='array', @@ -1167,6 +1165,9 @@ def test_machines_should_be_used_if_provided(task): s.bind(('127.0.0.1', open_ports[0])) dask_model.fit(dX, dy, group=dg) + # The above error leaves a worker waiting + client.restart() + # an informative error should be raised if "machines" has duplicates one_open_port = lgb.dask._find_random_open_port() dask_model.set_params( @@ -1231,72 +1232,67 @@ def test_dask_methods_and_sklearn_equivalents_have_similar_signatures(methods): @pytest.mark.parametrize('task', tasks) -def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array( - task, - client, -): - _, _, _, _, dX, dy, dw, dg = _create_data( - objective=task, - output='dataframe', - group=None - ) - - model_factory = task_to_dask_factory[task] - - dy = dy.to_dask_array(lengths=True) - dy_col_array = dy.reshape(-1, 1) - assert len(dy_col_array.shape) == 2 and dy_col_array.shape[1] == 1 - - params = { - 'n_estimators': 1, - 'num_leaves': 3, - 'random_state': 0, - 'time_out': 5 - } - model = model_factory(**params) - model.fit(dX, dy_col_array, sample_weight=dw, group=dg) - assert model.fitted_ - - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_training_succeeds_when_data_is_dataframe_and_label_is_column_array(task, cluster): + with Client(cluster) as client: + _, _, _, _, dX, dy, dw, dg = _create_data( + objective=task, + output='dataframe', + group=None + ) + + model_factory = task_to_dask_factory[task] + + dy = dy.to_dask_array(lengths=True) + dy_col_array = dy.reshape(-1, 1) + assert len(dy_col_array.shape) == 2 and dy_col_array.shape[1] == 1 + + params = { + 'n_estimators': 1, + 'num_leaves': 3, + 'random_state': 0, + 'time_out': 5 + } + model = model_factory(**params) + model.fit(dX, dy_col_array, sample_weight=dw, group=dg) + assert model.fitted_ @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('output', data_output) -def test_init_score(task, output, client): +def test_init_score(task, output, cluster): if task == 'ranking' and output == 'scipy_csr_matrix': pytest.skip('LGBMRanker is not currently tested on sparse matrices') - _, _, _, _, dX, dy, dw, dg = _create_data( - objective=task, - output=output, - group=None - ) - - model_factory = task_to_dask_factory[task] - - params = { - 'n_estimators': 1, - 'num_leaves': 2, - 'time_out': 5 - } - init_score = random.random() - # init_scores must be a 1D array, even for multiclass classification - # where you need to provide 1 score per class for each row in X - # https://github.com/microsoft/LightGBM/issues/4046 - size_factor = 1 - if task == 'multiclass-classification': - size_factor = 3 # number of classes - - if output.startswith('dataframe'): - init_scores = dy.map_partitions(lambda x: pd.Series([init_score] * x.size * size_factor)) - else: - init_scores = dy.map_blocks(lambda x: np.repeat(init_score, x.size * size_factor)) - model = model_factory(client=client, **params) - model.fit(dX, dy, sample_weight=dw, init_score=init_scores, group=dg) - # value of the root node is 0 when init_score is set - assert model.booster_.trees_to_dataframe()['value'][0] == 0 + with Client(cluster) as client: + _, _, _, _, dX, dy, dw, dg = _create_data( + objective=task, + output=output, + group=None + ) - client.close(timeout=CLIENT_CLOSE_TIMEOUT) + model_factory = task_to_dask_factory[task] + + params = { + 'n_estimators': 1, + 'num_leaves': 2, + 'time_out': 5 + } + init_score = random.random() + # init_scores must be a 1D array, even for multiclass classification + # where you need to provide 1 score per class for each row in X + # https://github.com/microsoft/LightGBM/issues/4046 + size_factor = 1 + if task == 'multiclass-classification': + size_factor = 3 # number of classes + + if output.startswith('dataframe'): + init_scores = dy.map_partitions(lambda x: pd.Series([init_score] * x.size * size_factor)) + else: + init_scores = dy.map_blocks(lambda x: np.repeat(init_score, x.size * size_factor)) + model = model_factory(client=client, **params) + model.fit(dX, dy, sample_weight=dw, init_score=init_scores, group=dg) + # value of the root node is 0 when init_score is set + assert model.booster_.trees_to_dataframe()['value'][0] == 0 def sklearn_checks_to_run(): @@ -1318,11 +1314,11 @@ def _tested_estimators(): @pytest.mark.parametrize("estimator", _tested_estimators()) @pytest.mark.parametrize("check", sklearn_checks_to_run()) -def test_sklearn_integration(estimator, check, client): - estimator.set_params(local_listen_port=18000, time_out=5) - name = type(estimator).__name__ - check(name, estimator) - client.close(timeout=CLIENT_CLOSE_TIMEOUT) +def test_sklearn_integration(estimator, check, cluster): + with Client(cluster) as client: + estimator.set_params(local_listen_port=18000, time_out=5) + name = type(estimator).__name__ + check(name, estimator) # this test is separate because it takes a not-yet-constructed estimator @@ -1338,39 +1334,38 @@ def test_parameters_default_constructible(estimator): @pytest.mark.parametrize('task', tasks) @pytest.mark.parametrize('output', data_output) -def test_predict_with_raw_score(task, output, client): +def test_predict_with_raw_score(task, output, cluster): if task == 'ranking' and output == 'scipy_csr_matrix': pytest.skip('LGBMRanker is not currently tested on sparse matrices') - _, _, _, _, dX, dy, _, dg = _create_data( - objective=task, - output=output, - group=None - ) - - model_factory = task_to_dask_factory[task] - params = { - 'client': client, - 'n_estimators': 1, - 'num_leaves': 2, - 'time_out': 5, - 'min_sum_hessian': 0 - } - model = model_factory(**params) - model.fit(dX, dy, group=dg) - raw_predictions = model.predict(dX, raw_score=True).compute() - - trees_df = model.booster_.trees_to_dataframe() - leaves_df = trees_df[trees_df.node_depth == 2] - if task == 'multiclass-classification': - for i in range(model.n_classes_): - class_df = leaves_df[leaves_df.tree_index == i] - assert set(raw_predictions[:, i]) == set(class_df['value']) - else: - assert set(raw_predictions) == set(leaves_df['value']) + with Client(cluster) as client: + _, _, _, _, dX, dy, _, dg = _create_data( + objective=task, + output=output, + group=None + ) - if task.endswith('classification'): - pred_proba_raw = model.predict_proba(dX, raw_score=True).compute() - assert_eq(raw_predictions, pred_proba_raw) + model_factory = task_to_dask_factory[task] + params = { + 'client': client, + 'n_estimators': 1, + 'num_leaves': 2, + 'time_out': 5, + 'min_sum_hessian': 0 + } + model = model_factory(**params) + model.fit(dX, dy, group=dg) + raw_predictions = model.predict(dX, raw_score=True).compute() + + trees_df = model.booster_.trees_to_dataframe() + leaves_df = trees_df[trees_df.node_depth == 2] + if task == 'multiclass-classification': + for i in range(model.n_classes_): + class_df = leaves_df[leaves_df.tree_index == i] + assert set(raw_predictions[:, i]) == set(class_df['value']) + else: + assert set(raw_predictions) == set(leaves_df['value']) - client.close(timeout=CLIENT_CLOSE_TIMEOUT) + if task.endswith('classification'): + pred_proba_raw = model.predict_proba(dX, raw_score=True).compute() + assert_eq(raw_predictions, pred_proba_raw) From 51f437fb09cb175c816a0d6ae2240f1358c5d0c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20MZ?= Date: Sat, 3 Apr 2021 21:33:07 -0600 Subject: [PATCH 2/2] wait on persist before rebalance --- tests/python_package_test/test_dask.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/python_package_test/test_dask.py b/tests/python_package_test/test_dask.py index 5d502db1a67b..2d3ac7c606a2 100644 --- a/tests/python_package_test/test_dask.py +++ b/tests/python_package_test/test_dask.py @@ -990,6 +990,7 @@ def test_error_on_feature_parallel_tree_learner(cluster): X = da.random.random((100, 10), chunks=(50, 10)) y = da.random.random(100, chunks=50) X, y = client.persist([X, y]) + _ = wait([X, y]) client.rebalance() dask_regressor = lgb.DaskLGBMRegressor( client=client,