diff --git a/.github/workflows/update-gpuci.yaml b/.github/workflows/update-gpuci.yaml deleted file mode 100644 index d88ae20be1..0000000000 --- a/.github/workflows/update-gpuci.yaml +++ /dev/null @@ -1,69 +0,0 @@ -name: Check for gpuCI updates - -on: - schedule: - - cron: "0 0 * * *" # Daily “At 00:00” UTC - workflow_dispatch: - -jobs: - update-gpuci: - runs-on: ubuntu-latest - if: github.repository == 'dask/distributed' - - steps: - - uses: actions/checkout@v4.1.3 - - - name: Parse current axis YAML - id: rapids_current - uses: the-coding-turtle/ga-yaml-parser@v0.1.2 - with: - file: continuous_integration/gpuci/axis.yaml - - - name: Get latest cuDF nightly version - id: cudf_latest - uses: jacobtomlinson/gha-anaconda-package-version@0.1.4 - with: - org: "rapidsai-nightly" - package: "cudf" - version_system: "CalVer" - - - name: Get latest UCX-Py nightly version - id: ucx_py_latest - uses: jacobtomlinson/gha-anaconda-package-version@0.1.4 - with: - org: "rapidsai-nightly" - package: "ucx-py" - version_system: "CalVer" - - - name: Get old RAPIDS / UCX-Py versions - env: - FULL_RAPIDS_VER: ${{ steps.cudf_latest.outputs.version }} - FULL_UCX_PY_VER: ${{ steps.ucx_py_latest.outputs.version }} - run: | - echo RAPIDS_VER=${{ steps.rapids_current.outputs.RAPIDS_VER_0 }} >> $GITHUB_ENV - echo UCX_PY_VER=$(curl -sL https://version.gpuci.io/rapids/${{ steps.rapids_current.outputs.RAPIDS_VER_0 }}) >> $GITHUB_ENV - echo NEW_RAPIDS_VER=$(echo $FULL_RAPIDS_VER | cut -d'.' -f1,2) >> $GITHUB_ENV - echo NEW_UCX_PY_VER=$(echo $FULL_UCX_PY_VER | cut -d'.' -f1,2) >> $GITHUB_ENV - - - name: Update RAPIDS version - uses: jacobtomlinson/gha-find-replace@v3 - with: - include: 'continuous_integration\/gpuci\/axis\.yaml' - find: "${{ env.RAPIDS_VER }}" - replace: "${{ env.NEW_RAPIDS_VER }}" - regex: false - - - name: Create Pull Request - uses: peter-evans/create-pull-request@v6 - if: ${{ env.UCX_PY_VER != env.NEW_UCX_PY_VER }} # make sure new ucx-py nightlies are available - with: - token: ${{ secrets.GITHUB_TOKEN }} - draft: false - commit-message: "Update gpuCI `RAPIDS_VER` to `${{ env.NEW_RAPIDS_VER }}`" - title: "Update gpuCI `RAPIDS_VER` to `${{ env.NEW_RAPIDS_VER }}`" - author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> - branch: "upgrade-gpuci-rapids" - body: | - New cuDF and ucx-py nightly versions have been detected. - - Updated `axis.yaml` to use `${{ env.NEW_RAPIDS_VER }}`. diff --git a/CODEOWNERS b/CODEOWNERS index 08ebf86238..38c8a480a8 100644 --- a/CODEOWNERS +++ b/CODEOWNERS @@ -12,4 +12,3 @@ distributed/widgets/* @jacobtomlinson # GPU Support distributed/diagnostics/nvml.py @jacobtomlinson @quasiben -continuous_integration/gpuci/* @jacobtomlinson @quasiben diff --git a/continuous_integration/gpuci/axis.yaml b/continuous_integration/gpuci/axis.yaml deleted file mode 100644 index 41fca92d9f..0000000000 --- a/continuous_integration/gpuci/axis.yaml +++ /dev/null @@ -1,14 +0,0 @@ -PYTHON_VER: -- "3.10" -- "3.11" - -CUDA_VER: -- "11.8.0" - -LINUX_VER: -- ubuntu20.04 - -RAPIDS_VER: -- "24.12" - -excludes: diff --git a/continuous_integration/gpuci/build.sh b/continuous_integration/gpuci/build.sh deleted file mode 100644 index ab3bc95c48..0000000000 --- a/continuous_integration/gpuci/build.sh +++ /dev/null @@ -1,62 +0,0 @@ -############################################## -# Dask GPU build and test script for CI # -############################################## -set -e -NUMARGS=$# -ARGS=$* - -# Arg parsing function -function hasArg { - (( ${NUMARGS} != 0 )) && (echo " ${ARGS} " | grep -q " $1 ") -} - -# Set path and build parallel level -export PATH=/opt/conda/bin:/usr/local/cuda/bin:$PATH -export PARALLEL_LEVEL=${PARALLEL_LEVEL:-4} - -# Set home to the job's workspace -export HOME="$WORKSPACE" - -# Switch to project root; also root of repo checkout -cd "$WORKSPACE" - -# Determine CUDA release version -export CUDA_REL=${CUDA_VERSION%.*} - -# FIXME - monitoring GIL contention causes UCX teardown issues -export DASK_DISTRIBUTED__ADMIN__SYSTEM_MONITOR__GIL__ENABLED=False - -################################################################################ -# SETUP - Check environment -################################################################################ - -rapids-logger "Check environment variables" -env - -rapids-logger "Check GPU usage" -nvidia-smi - -rapids-logger "Activate conda env" -. /opt/conda/etc/profile.d/conda.sh -conda activate dask - -rapids-logger "Install distributed" -python -m pip install -e . - -rapids-logger "Install dask" -python -m pip install git+https://github.com/dask/dask - -rapids-logger "Check Python versions" -python --version - -rapids-logger "Check conda environment" -conda info -conda config --show-sources -conda list --show-channel-urls - -rapids-logger "Python py.test for distributed" -py.test distributed -v -m gpu --runslow --junitxml="$WORKSPACE/junit-distributed.xml" - -# cuDF spill stats monitoring must be enabled for this test -CUDF_SPILL=on CUDF_SPILL_STATS=1 DASK_DISTRIBUTED__DIAGNOSTICS__CUDF=1 \ - py.test distributed/diagnostics/tests/test_cudf_diagnostics.py -v -m gpu --runslow --junitxml="$WORKSPACE/junit-distributed.xml" diff --git a/distributed/client.py b/distributed/client.py index be89e2cf8d..c93f5ff8e5 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -70,7 +70,7 @@ from tornado import gen from tornado.ioloop import IOLoop -from dask._task_spec import DataNode, GraphNode, Task, TaskRef +from dask._task_spec import DataNode, GraphNode, Task, TaskRef, parse_input import distributed.utils from distributed import cluster_dump, preloading @@ -622,6 +622,9 @@ def __await__(self): def __hash__(self): return hash(self._id) + def __eq__(self, other): + return self is other + class FutureState: """A Future's internal state. @@ -850,12 +853,10 @@ def __init__( **kwargs, ): self.func: Callable = func - self.iterables: Iterable[Any] = ( - list(zip(*zip(*iterables))) if _is_nested(iterables) else [iterables] - ) + self.iterables = [tuple(map(parse_input, iterable)) for iterable in iterables] self.key: str | Iterable[str] | None = key self.pure: bool = pure - self.kwargs = kwargs + self.kwargs = {k: parse_input(v) for k, v in kwargs.items()} super().__init__(annotations=annotations) def __repr__(self) -> str: @@ -2163,13 +2164,12 @@ def submit( if isinstance(workers, (str, Number)): workers = [workers] - dsk = { key: Task( key, func, - *args, - **kwargs, + *(parse_input(a) for a in args), + **{k: parse_input(v) for k, v in kwargs.items()}, ) } futures = self._graph_to_futures( diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 74b13ed61b..f8a26d59d4 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -4571,7 +4571,7 @@ async def add_worker( ) self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id) - logger.info("Register worker %s", ws) + logger.info("Register worker addr: %s name: %s", ws.address, ws.name) msg = { "status": "OK", @@ -5428,7 +5428,9 @@ async def remove_worker( ws = self.workers[address] - logger.info(f"Remove worker {ws} ({stimulus_id=})") + logger.info( + f"Remove worker addr: {ws.address} name: {ws.name} ({stimulus_id=})" + ) if close: with suppress(AttributeError, CommClosedError): self.stream_comms[address].send( @@ -7548,7 +7550,11 @@ async def retire_workers( names_set = {str(name) for name in names} wss = {ws for ws in self.workers.values() if str(ws.name) in names_set} elif workers is not None: - logger.info("Retire worker addresses %s", workers) + logger.info( + "Retire worker addresses (stimulus_id='%s') %s", + stimulus_id, + workers, + ) wss = { self.workers[address] for address in workers @@ -7572,8 +7578,6 @@ async def retire_workers( try: coros = [] for ws in wss: - logger.info(f"Retiring worker {ws.address!r} ({stimulus_id=!r})") - policy = RetireWorker(ws.address) amm.add_policy(policy) diff --git a/distributed/shuffle/_core.py b/distributed/shuffle/_core.py index c782d56e38..a3f9ce327d 100644 --- a/distributed/shuffle/_core.py +++ b/distributed/shuffle/_core.py @@ -25,7 +25,7 @@ from tornado.ioloop import IOLoop import dask.config -from dask._task_spec import Task, _inline_recursively +from dask._task_spec import Task from dask.core import flatten from dask.typing import Key from dask.utils import parse_bytes, parse_timedelta @@ -569,7 +569,7 @@ def _mean_shard_size(shards: Iterable) -> int: return size // count if count else 0 -def p2p_barrier(id: ShuffleId, run_ids: list[int]) -> int: +def p2p_barrier(id: ShuffleId, *run_ids: int) -> int: try: return get_worker_plugin().barrier(id, run_ids) except Reschedule as e: @@ -599,18 +599,9 @@ def __init__( self.spec = spec super().__init__(key, func, *args, **kwargs) - def copy(self) -> P2PBarrierTask: - return P2PBarrierTask( - self.key, self.func, *self.args, spec=self.spec, **self.kwargs - ) - def __repr__(self) -> str: return f"P2PBarrierTask({self.key!r})" - def inline(self, dsk: dict[Key, Any]) -> P2PBarrierTask: - new_args = _inline_recursively(self.args, dsk) - new_kwargs = _inline_recursively(self.kwargs, dsk) - assert self.func is not None - return P2PBarrierTask( - self.key, self.func, *new_args, spec=self.spec, **new_kwargs - ) + @property + def block_fusion(self) -> bool: + return True diff --git a/distributed/shuffle/_merge.py b/distributed/shuffle/_merge.py index c7e62d5558..bf7a532e80 100644 --- a/distributed/shuffle/_merge.py +++ b/distributed/shuffle/_merge.py @@ -418,7 +418,7 @@ def _construct_graph(self) -> _T_LowLevelGraph: _barrier_key_left, p2p_barrier, token_left, - transfer_keys_left, + *transfer_keys_left, spec=DataFrameShuffleSpec( id=shuffle_id_left, npartitions=self.npartitions, @@ -435,7 +435,7 @@ def _construct_graph(self) -> _T_LowLevelGraph: _barrier_key_right, p2p_barrier, token_right, - transfer_keys_right, + *transfer_keys_right, spec=DataFrameShuffleSpec( id=shuffle_id_right, npartitions=self.npartitions, diff --git a/distributed/shuffle/_rechunk.py b/distributed/shuffle/_rechunk.py index 354828415b..8b597f79e4 100644 --- a/distributed/shuffle/_rechunk.py +++ b/distributed/shuffle/_rechunk.py @@ -121,7 +121,7 @@ import dask import dask.config -from dask._task_spec import Task, TaskRef +from dask._task_spec import Task, TaskRef, parse_input from dask.highlevelgraph import HighLevelGraph from dask.layers import Layer from dask.tokenize import tokenize @@ -756,7 +756,9 @@ def partial_concatenate( rec_cat_arg[old_partial_index] = TaskRef((input_name,) + old_global_index) concat_task = Task( - (rechunk_name(token),) + global_new_index, concatenate3, rec_cat_arg.tolist() + (rechunk_name(token),) + global_new_index, + concatenate3, + parse_input(rec_cat_arg.tolist()), ) dsk[concat_task.key] = concat_task return dsk @@ -822,7 +824,7 @@ def partial_rechunk( _barrier_key, p2p_barrier, partial_token, - transfer_keys, + *transfer_keys, spec=ArrayRechunkSpec( id=ShuffleId(partial_token), new=partial_new, old=partial_old, disk=disk ), diff --git a/distributed/shuffle/_shuffle.py b/distributed/shuffle/_shuffle.py index 912259e4a0..6f78a65042 100644 --- a/distributed/shuffle/_shuffle.py +++ b/distributed/shuffle/_shuffle.py @@ -275,7 +275,7 @@ def _construct_graph(self) -> _T_LowLevelGraph: _barrier_key, p2p_barrier, token, - transfer_keys, + *transfer_keys, spec=DataFrameShuffleSpec( id=shuffle_id, npartitions=self.npartitions, diff --git a/distributed/tests/test_as_completed.py b/distributed/tests/test_as_completed.py index 27b5b18c94..b2865a3695 100644 --- a/distributed/tests/test_as_completed.py +++ b/distributed/tests/test_as_completed.py @@ -131,20 +131,21 @@ def test_as_completed_is_empty(client): assert ac.is_empty() -def test_as_completed_cancel(client): - x = client.submit(inc, 1) - y = client.submit(inc, 1) +@gen_cluster(client=True) +async def test_as_completed_cancel(c, s, a, b): + x = c.submit(inc, 1) + y = c.submit(inc, 1) ac = as_completed([x, y]) - x.cancel() + await x.cancel() - assert next(ac) is x or y - assert next(ac) is y or x + async for fut in ac: + assert fut is y or fut is x with pytest.raises(queue.Empty): ac.queue.get(timeout=0.1) - res = list(as_completed([x, y, x])) + res = [fut async for fut in as_completed([x, y, x])] assert len(res) == 3 assert set(res) == {x, y} assert res.count(x) == 2 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 9e2f235e88..404e6817e6 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -2985,19 +2985,15 @@ async def test_log_remove_worker(c, s, a, b): assert log.getvalue().splitlines() == [ # Successful graceful - f"Retire worker addresses ['{a.address}']", - f"Retiring worker '{a.address}' (stimulus_id='graceful')", - f"Remove worker (stimulus_id='graceful')", + f"Retire worker addresses (stimulus_id='graceful') ['{a.address}']", + f"Remove worker addr: {a.address} name: {a.name} (stimulus_id='graceful')", f"Retired worker '{a.address}' (stimulus_id='graceful')", # Aborted graceful - f"Retire worker addresses ['{b.address}']", - f"Retiring worker '{b.address}' (stimulus_id='graceful_abort')", + f"Retire worker addresses (stimulus_id='graceful_abort') ['{b.address}']", f"Could not retire worker '{b.address}': unique data could not be " "moved to any other worker (stimulus_id='graceful_abort')", # Ungraceful - f"Remove worker (stimulus_id='ungraceful')", + f"Remove worker addr: {b.address} name: {b.name} (stimulus_id='ungraceful')", f"Removing worker '{b.address}' caused the cluster to lose already " "computed task(s), which will be recomputed elsewhere: {'x'} " "(stimulus_id='ungraceful')",