From cff0733f2505d54346f6e4730bc0aac4c38677d4 Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 02:06:53 +0800 Subject: [PATCH 01/11] [dask] Add config for scheduler address. --- doc/tutorials/dask.rst | 24 ++++++++++++++++ python-package/xgboost/dask.py | 40 ++++++++++++++++++++------ tests/python-gpu/test_gpu_with_dask.py | 4 +-- tests/python/test_tracker.py | 2 +- tests/python/test_with_dask.py | 7 +++-- 5 files changed, 64 insertions(+), 13 deletions(-) diff --git a/doc/tutorials/dask.rst b/doc/tutorials/dask.rst index 84b8e9435fdb..ae59961da8f9 100644 --- a/doc/tutorials/dask.rst +++ b/doc/tutorials/dask.rst @@ -475,6 +475,30 @@ interface, including callback functions, custom evaluation metric and objective: ) +*************** +Tracker Host IP +*************** + +.. versionadded:: 1.6.0 + +In some environments XGBoost might fail to resolve the IP address of the scheduler, a +symptom is user receiving ``OSError: [Errno 99] Cannot assign requested address`` error +during training. A quick workaround is to specify the address explicitly. To do that +dask config is used: + +.. code-block:: python + + import dask + from distributed import Client + from xgboost import dask as dxgb + # let xgboost know the scheduler address + dask.config.set({"xgboost.scheduler_address": "192.0.0.100"}) + + with Client(scheduler_file="sched.json") as client: + reg = dxgb.DaskXGBRegressor() + +XGBoost will read configuration before training. + ***************************************************************************** Why is the initialization of ``DaskDMatrix`` so slow and throws weird errors ***************************************************************************** diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 4b70a4fbee09..e85f8f25d509 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -136,10 +136,10 @@ async def __aexit__(self, *args: Any, **kwargs: Any) -> None: return MultiLock -def _start_tracker(n_workers: int) -> Dict[str, Any]: +def _start_tracker(n_workers: int, host_ip: Optional[str]) -> Dict[str, Any]: """Start Rabit tracker """ env: Dict[str, Union[int, str]] = {'DMLC_NUM_WORKER': n_workers} - host = get_host_ip('auto') + host = get_host_ip(host_ip) rabit_context = RabitTracker(hostIP=host, n_workers=n_workers, use_logger=False) env.update(rabit_context.worker_envs()) @@ -805,12 +805,27 @@ def _dmatrix_from_list_of_parts( return _create_dmatrix(**kwargs) -async def _get_rabit_args(n_workers: int, client: "distributed.Client") -> List[bytes]: - '''Get rabit context arguments from data distribution in DaskDMatrix.''' - env = await client.run_on_scheduler(_start_tracker, n_workers) +async def _get_rabit_args( + n_workers: int, dconfig: Optional[Dict[str, Any]], client: "distributed.Client" +) -> List[bytes]: + """Get rabit context arguments from data distribution in DaskDMatrix.""" + valid_config = ["scheduler_address"] + if dconfig is not None: + for k in dconfig: + if k not in valid_config: + raise ValueError(f"Unknown configuration: {k}") + host_ip: Optional[str] = dconfig.get("scheduler_address", None) + else: + host_ip = None + + env = await client.run_on_scheduler(_start_tracker, n_workers, host_ip) rabit_args = [f"{k}={v}".encode() for k, v in env.items()] return rabit_args + +def _get_dask_config() -> Optional[Dict[str, Any]]: + return dask.config.get("xgboost", default=None) + # train and predict methods are supposed to be "functional", which meets the # dask paradigm. But as a side effect, the `evals_result` in single-node API # is no longer supported since it mutates the input parameter, and it's not @@ -837,6 +852,7 @@ def _get_workers_from_data( async def _train_async( client: "distributed.Client", global_config: Dict[str, Any], + dconfig: Optional[Dict[str, Any]], params: Dict[str, Any], dtrain: DaskDMatrix, num_boost_round: int, @@ -850,7 +866,7 @@ async def _train_async( custom_metric: Optional[Metric], ) -> Optional[TrainReturnT]: workers = _get_workers_from_data(dtrain, evals) - _rabit_args = await _get_rabit_args(len(workers), client) + _rabit_args = await _get_rabit_args(len(workers), dconfig, client) if params.get("booster", None) == "gblinear": raise NotImplementedError( @@ -948,7 +964,7 @@ def dispatched_train( @_deprecate_positional_args -def train( # pylint: disable=unused-argument +def train( # pylint: disable=unused-argument client: "distributed.Client", params: Dict[str, Any], dtrain: DaskDMatrix, @@ -995,7 +1011,12 @@ def train( # pylint: disable=unused-argument _assert_dask_support() client = _xgb_get_client(client) args = locals() - return client.sync(_train_async, global_config=config.get_config(), **args) + return client.sync( + _train_async, + global_config=config.get_config(), + dconfig=_get_dask_config(), + **args, + ) def _can_output_df(is_df: bool, output_shape: Tuple) -> bool: @@ -1693,6 +1714,7 @@ async def _fit_async( asynchronous=True, client=self.client, global_config=config.get_config(), + dconfig=_get_dask_config(), params=params, dtrain=dtrain, num_boost_round=self.get_num_boosting_rounds(), @@ -1796,6 +1818,7 @@ async def _fit_async( asynchronous=True, client=self.client, global_config=config.get_config(), + dconfig=_get_dask_config(), params=params, dtrain=dtrain, num_boost_round=self.get_num_boosting_rounds(), @@ -1987,6 +2010,7 @@ async def _fit_async( asynchronous=True, client=self.client, global_config=config.get_config(), + dconfig=_get_dask_config(), params=params, dtrain=dtrain, num_boost_round=self.get_num_boosting_rounds(), diff --git a/tests/python-gpu/test_gpu_with_dask.py b/tests/python-gpu/test_gpu_with_dask.py index a412badf393e..c1795c940685 100644 --- a/tests/python-gpu/test_gpu_with_dask.py +++ b/tests/python-gpu/test_gpu_with_dask.py @@ -371,7 +371,7 @@ def test_data_initialization(self, local_cuda_cluster: LocalCUDACluster) -> None m = dxgb.DaskDMatrix(client, X, y, feature_weights=fw) workers = _get_client_workers(client) - rabit_args = client.sync(dxgb._get_rabit_args, len(workers), client) + rabit_args = client.sync(dxgb._get_rabit_args, len(workers), None, client) def worker_fn(worker_addr: str, data_ref: Dict) -> None: with dxgb.RabitContext(rabit_args): @@ -473,7 +473,7 @@ def runit( with Client(local_cuda_cluster) as client: workers = _get_client_workers(client) - rabit_args = client.sync(dxgb._get_rabit_args, workers, client) + rabit_args = client.sync(dxgb._get_rabit_args, workers, None, client) futures = client.map(runit, workers, pure=False, diff --git a/tests/python/test_tracker.py b/tests/python/test_tracker.py index e86c7c72a117..0ba7199eb094 100644 --- a/tests/python/test_tracker.py +++ b/tests/python/test_tracker.py @@ -28,7 +28,7 @@ def run_rabit_ops(client, n_workers): from xgboost import rabit workers = _get_client_workers(client) - rabit_args = client.sync(_get_rabit_args, len(workers), client) + rabit_args = client.sync(_get_rabit_args, len(workers), None, client) assert not rabit.is_distributed() n_workers_from_dask = len(workers) assert n_workers == n_workers_from_dask diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index 48c7a2f3cf74..8aebaa158ff5 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -1315,7 +1315,8 @@ def runit( with Client(cluster) as client: workers = _get_client_workers(client) rabit_args = client.sync( - xgb.dask._get_rabit_args, len(workers), client) + xgb.dask._get_rabit_args, len(workers), None, client + ) futures = client.map(runit, workers, pure=False, @@ -1443,7 +1444,9 @@ def test_no_duplicated_partition(self) -> None: n_partitions = X.npartitions m = xgb.dask.DaskDMatrix(client, X, y) workers = _get_client_workers(client) - rabit_args = client.sync(xgb.dask._get_rabit_args, len(workers), client) + rabit_args = client.sync( + xgb.dask._get_rabit_args, len(workers), None, client + ) n_workers = len(workers) def worker_fn(worker_addr: str, data_ref: Dict) -> None: From 137bd9587a829c35bd61ef3bfc64b47d7d8dfcbc Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 14:32:30 +0800 Subject: [PATCH 02/11] doc. --- doc/tutorials/dask.rst | 2 ++ python-package/xgboost/dask.py | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/doc/tutorials/dask.rst b/doc/tutorials/dask.rst index ae59961da8f9..465f86410df0 100644 --- a/doc/tutorials/dask.rst +++ b/doc/tutorials/dask.rst @@ -475,6 +475,8 @@ interface, including callback functions, custom evaluation metric and objective: ) +.. _tracker-ip: + *************** Tracker Host IP *************** diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index e85f8f25d509..73fe8ae07d3b 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -3,8 +3,12 @@ # pylint: disable=too-many-lines, fixme # pylint: disable=too-few-public-methods # pylint: disable=import-error -"""Dask extensions for distributed training. See :doc:`Distributed XGBoost with Dask -` for simple tutorial. Also xgboost/demo/dask for some examples. +""" +Dask extensions for distributed training +---------------------------------------- + +See :doc:`Distributed XGBoost with Dask ` for simple tutorial. Also +:doc:`/python/dask-examples/index` for some examples. There are two sets of APIs in this module, one is the functional API including ``train`` and ``predict`` methods. Another is stateful Scikit-Learner wrapper @@ -13,6 +17,17 @@ The implementation is heavily influenced by dask_xgboost: https://github.com/dask/dask-xgboost +Optional dask configuration +=========================== + +- **xgboost.scheduler_address**: Specify the scheduler address, see :ref:`tracker-ip`. + + .. versionadded:: 1.6.0 + + .. code-block:: python + + dask.config.set({"xgboost.scheduler_address": "192.0.0.100"}) + """ import platform import logging From 50d701d3f906a46b7420b6c1d0187b89cc53c3da Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 16:17:43 +0800 Subject: [PATCH 03/11] Bring back an old configuration. --- python-package/xgboost/dask.py | 45 ++++++++++++++++++++++++++----- python-package/xgboost/tracker.py | 3 ++- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 73fe8ae07d3b..0bca8d97df56 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -32,6 +32,7 @@ import platform import logging import collections +import socket from contextlib import contextmanager from collections import defaultdict from threading import Thread @@ -151,11 +152,25 @@ async def __aexit__(self, *args: Any, **kwargs: Any) -> None: return MultiLock -def _start_tracker(n_workers: int, host_ip: Optional[str]) -> Dict[str, Any]: - """Start Rabit tracker """ - env: Dict[str, Union[int, str]] = {'DMLC_NUM_WORKER': n_workers} - host = get_host_ip(host_ip) - rabit_context = RabitTracker(hostIP=host, n_workers=n_workers, use_logger=False) +def _start_tracker( + n_workers: int, addr_from_dask: str, addr_from_user: Optional[str] +) -> Dict[str, Any]: + """Start Rabit tracker""" + env: Dict[str, Union[int, str]] = {"DMLC_NUM_WORKER": n_workers} + try: + rabit_context = RabitTracker( + hostIP=get_host_ip(addr_from_user), n_workers=n_workers, use_logger=False + ) + except socket.error as e: + if e.errno != 99: # not a bind error + raise + LOGGER.warning( + f"Failed to bind address: {get_host_ip(addr_from_user)}, try" + f" {addr_from_dask} instead." + ) + rabit_context = RabitTracker( + hostIP=addr_from_dask, n_workers=n_workers, use_logger=False + ) env.update(rabit_context.worker_envs()) rabit_context.start(n_workers) @@ -823,8 +838,16 @@ def _dmatrix_from_list_of_parts( async def _get_rabit_args( n_workers: int, dconfig: Optional[Dict[str, Any]], client: "distributed.Client" ) -> List[bytes]: - """Get rabit context arguments from data distribution in DaskDMatrix.""" + """Get rabit context arguments from data distribution in DaskDMatrix. + + """ + # There are 3 possible different addresses: + # 1. Provided by user via dask.config + # 2. Guessed by xgboost `get_host_ip` function + # 3. From dask scheduler + # We try 1 and 3 if 1 is available, otherwise 2 and 3. valid_config = ["scheduler_address"] + # See if user config is available if dconfig is not None: for k in dconfig: if k not in valid_config: @@ -832,8 +855,16 @@ async def _get_rabit_args( host_ip: Optional[str] = dconfig.get("scheduler_address", None) else: host_ip = None + # Try address from dask scheduler, this might not work, see + # https://github.com/dask/dask-xgboost/pull/40 + try: + sched_addr = distributed.comm.get_address_host(client.scheduler.address) + sched_addr = sched_addr.strip("/:") + except Exception: # pylint: disable=broad-except + sched_addr = None + + env = await client.run_on_scheduler(_start_tracker, n_workers, sched_addr, host_ip) - env = await client.run_on_scheduler(_start_tracker, n_workers, host_ip) rabit_args = [f"{k}={v}".encode() for k, v in env.items()] return rabit_args diff --git a/python-package/xgboost/tracker.py b/python-package/xgboost/tracker.py index f2fc90302c0e..9e040d05b2d3 100644 --- a/python-package/xgboost/tracker.py +++ b/python-package/xgboost/tracker.py @@ -192,7 +192,8 @@ def __init__( logging.info('start listen on %s:%d', hostIP, self.port) def __del__(self) -> None: - self.sock.close() + if hasattr(self, "sock"): + self.sock.close() @staticmethod def get_neighbor(rank: int, n_workers: int) -> List[int]: From c51eabe809af070fc77a682a75f4625f4567319c Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 16:24:29 +0800 Subject: [PATCH 04/11] linter. --- python-package/xgboost/dask.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 0bca8d97df56..e4f302a0f1db 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -165,8 +165,9 @@ def _start_tracker( if e.errno != 99: # not a bind error raise LOGGER.warning( - f"Failed to bind address: {get_host_ip(addr_from_user)}, try" - f" {addr_from_dask} instead." + "Failed to bind address: %s, try %s instead.", + str(get_host_ip(addr_from_user)), + str(addr_from_dask), ) rabit_context = RabitTracker( hostIP=addr_from_dask, n_workers=n_workers, use_logger=False From ca332d00e6e2636f4a9aee39b152ec6e20a714e9 Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 17:32:52 +0800 Subject: [PATCH 05/11] Correct type. --- python-package/xgboost/dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index e4f302a0f1db..5205643495a5 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -153,7 +153,7 @@ async def __aexit__(self, *args: Any, **kwargs: Any) -> None: def _start_tracker( - n_workers: int, addr_from_dask: str, addr_from_user: Optional[str] + n_workers: int, addr_from_dask: Optional[str], addr_from_user: Optional[str] ) -> Dict[str, Any]: """Start Rabit tracker""" env: Dict[str, Union[int, str]] = {"DMLC_NUM_WORKER": n_workers} @@ -162,7 +162,7 @@ def _start_tracker( hostIP=get_host_ip(addr_from_user), n_workers=n_workers, use_logger=False ) except socket.error as e: - if e.errno != 99: # not a bind error + if e.errno != 99 or addr_from_dask is None: # not a bind error raise LOGGER.warning( "Failed to bind address: %s, try %s instead.", From b33559dcbd928df657ea30ecf11a4336de66aad3 Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 17:44:14 +0800 Subject: [PATCH 06/11] assert is distributed. --- python-package/xgboost/dask.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 5205643495a5..dc710a988503 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -205,6 +205,7 @@ def __init__(self, args: List[bytes]) -> None: def __enter__(self) -> None: rabit.init(self.args) + assert rabit.is_distributed() LOGGER.debug('-------------- rabit say hello ------------------') def __exit__(self, *args: List) -> None: From 1c9f466c418529b491d7d76ea204465ed3b94b3f Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 17:56:46 +0800 Subject: [PATCH 07/11] msg. --- python-package/xgboost/dask.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index dc710a988503..1a1918700301 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -165,7 +165,7 @@ def _start_tracker( if e.errno != 99 or addr_from_dask is None: # not a bind error raise LOGGER.warning( - "Failed to bind address: %s, try %s instead.", + "Failed to bind address '%s', trying to use '%s' instead.", str(get_host_ip(addr_from_user)), str(addr_from_dask), ) From d8d7ccb32f36594ffd7cdc11ae785a1d96dbcaca Mon Sep 17 00:00:00 2001 From: fis Date: Thu, 20 Jan 2022 20:27:40 +0800 Subject: [PATCH 08/11] A simple test. --- tests/python/test_with_dask.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/python/test_with_dask.py b/tests/python/test_with_dask.py index 8aebaa158ff5..4926d9a2fafa 100644 --- a/tests/python/test_with_dask.py +++ b/tests/python/test_with_dask.py @@ -30,6 +30,7 @@ pytest.skip(msg=tm.no_dask()['reason'], allow_module_level=True) from distributed import LocalCluster, Client +import dask import dask.dataframe as dd import dask.array as da from xgboost.dask import DaskDMatrix @@ -1216,6 +1217,10 @@ def after_iteration( os.remove(before_fname) os.remove(after_fname) + with dask.config.set({'xgboost.foo': "bar"}): + with pytest.raises(ValueError): + xgb.dask.train(client, {}, dtrain, num_boost_round=4) + def run_updater_test( self, client: "Client", From 6e97a658078d3af482545655ce8f9b294858a9fe Mon Sep 17 00:00:00 2001 From: fis Date: Fri, 21 Jan 2022 01:39:50 +0800 Subject: [PATCH 09/11] Use recursive function to try binding the addresses. --- python-package/xgboost/dask.py | 37 +++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 1a1918700301..575e0ddd6c9c 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -152,32 +152,37 @@ async def __aexit__(self, *args: Any, **kwargs: Any) -> None: return MultiLock -def _start_tracker( - n_workers: int, addr_from_dask: Optional[str], addr_from_user: Optional[str] -) -> Dict[str, Any]: - """Start Rabit tracker""" +def _try_start_tracker( + n_workers: int, addrs: List[Optional[str]] +) -> Dict[str, Union[int, str]]: env: Dict[str, Union[int, str]] = {"DMLC_NUM_WORKER": n_workers} try: rabit_context = RabitTracker( - hostIP=get_host_ip(addr_from_user), n_workers=n_workers, use_logger=False + hostIP=get_host_ip(addrs[0]), n_workers=n_workers, use_logger=False ) + env.update(rabit_context.worker_envs()) + rabit_context.start(n_workers) + thread = Thread(target=rabit_context.join) + thread.daemon = True + thread.start() except socket.error as e: - if e.errno != 99 or addr_from_dask is None: # not a bind error + if len(addrs) < 2 or e.errno != 99: raise LOGGER.warning( "Failed to bind address '%s', trying to use '%s' instead.", - str(get_host_ip(addr_from_user)), - str(addr_from_dask), - ) - rabit_context = RabitTracker( - hostIP=addr_from_dask, n_workers=n_workers, use_logger=False + str(addrs[0]), + str(addrs[1]), ) - env.update(rabit_context.worker_envs()) + env = _try_start_tracker(n_workers, addrs[1:]) + finally: + return env + - rabit_context.start(n_workers) - thread = Thread(target=rabit_context.join) - thread.daemon = True - thread.start() +def _start_tracker( + n_workers: int, addr_from_dask: Optional[str], addr_from_user: Optional[str] +) -> Dict[str, Any]: + """Start Rabit tracker, recurse to try different addresses.""" + env = _try_start_tracker(n_workers, [get_host_ip(addr_from_user), addr_from_dask]) return env From 7a87d2171f604266b259e25b86c2c8bad1a120ea Mon Sep 17 00:00:00 2001 From: fis Date: Fri, 21 Jan 2022 01:47:40 +0800 Subject: [PATCH 10/11] Don't swallow the exception. --- python-package/xgboost/dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 575e0ddd6c9c..1d3368bc5dd6 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -174,8 +174,8 @@ def _try_start_tracker( str(addrs[1]), ) env = _try_start_tracker(n_workers, addrs[1:]) - finally: - return env + + return env def _start_tracker( From 16bf957350435404b765aea9c91684c5de67e180 Mon Sep 17 00:00:00 2001 From: fis Date: Fri, 21 Jan 2022 15:02:42 +0800 Subject: [PATCH 11/11] Remove an extra `get_host_ip`. --- python-package/xgboost/dask.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python-package/xgboost/dask.py b/python-package/xgboost/dask.py index 1d3368bc5dd6..5a89d02dc665 100644 --- a/python-package/xgboost/dask.py +++ b/python-package/xgboost/dask.py @@ -180,9 +180,9 @@ def _try_start_tracker( def _start_tracker( n_workers: int, addr_from_dask: Optional[str], addr_from_user: Optional[str] -) -> Dict[str, Any]: +) -> Dict[str, Union[int, str]]: """Start Rabit tracker, recurse to try different addresses.""" - env = _try_start_tracker(n_workers, [get_host_ip(addr_from_user), addr_from_dask]) + env = _try_start_tracker(n_workers, [addr_from_user, addr_from_dask]) return env