From 36febfdadc9914a6bd75eea9f14b04e8c79df02c Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 4 Apr 2024 19:46:28 -0700 Subject: [PATCH 1/3] avoid p2p shuffle as a default --- python/dask_cudf/dask_cudf/expr/__init__.py | 3 +++ .../dask_cudf/tests/test_distributed.py | 22 ++++++++++++++++--- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/__init__.py b/python/dask_cudf/dask_cudf/expr/__init__.py index 826f514a674..a76b655ef42 100644 --- a/python/dask_cudf/dask_cudf/expr/__init__.py +++ b/python/dask_cudf/dask_cudf/expr/__init__.py @@ -8,6 +8,9 @@ # Register custom expressions and collections if QUERY_PLANNING_ON: + # Broadly avoid "p2p" and "disk" defaults for now + config.set({"dataframe.shuffle.method": "tasks"}) + try: import dask_cudf.expr._collection import dask_cudf.expr._expr diff --git a/python/dask_cudf/dask_cudf/tests/test_distributed.py b/python/dask_cudf/dask_cudf/tests/test_distributed.py index 39eadb45c91..9bfc1c25c17 100644 --- a/python/dask_cudf/dask_cudf/tests/test_distributed.py +++ b/python/dask_cudf/dask_cudf/tests/test_distributed.py @@ -16,9 +16,9 @@ dask_cuda = pytest.importorskip("dask_cuda") -def more_than_two_gpus(): +def more_than_n_gpus(n): ngpus = len(numba.cuda.gpus) - return ngpus >= 2 + return ngpus >= n @pytest.mark.parametrize("delayed", [True, False]) @@ -54,7 +54,7 @@ def test_merge(): @pytest.mark.skipif( - not more_than_two_gpus(), reason="Machine does not have more than two GPUs" + not more_than_n_gpus(2), reason="Machine does not have more than two GPUs" ) def test_ucx_seriesgroupby(): pytest.importorskip("ucp") @@ -97,3 +97,19 @@ def test_p2p_shuffle(): ddf.compute().sort_values("x"), check_index=False, ) + + +@pytest.mark.skipif( + not more_than_n_gpus(3), + reason="Machine does not have more than three GPUs", +) +def test_unique(): + with dask_cuda.LocalCUDACluster(n_workers=3) as cluster: + with Client(cluster): + df = cudf.DataFrame({"x": ["a", "b", "c", "a", "a"]}) + ddf = dask_cudf.from_cudf(df, npartitions=2) + dd.assert_eq( + df.x.unique(), + ddf.x.unique().compute(), + check_index=False, + ) From 10fded00906088fe3b65388a771f6b82ef5cf2de Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 5 Apr 2024 15:49:35 -0700 Subject: [PATCH 2/3] rename helper to at_least_n_gpus --- python/dask_cudf/dask_cudf/tests/test_distributed.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/dask_cudf/dask_cudf/tests/test_distributed.py b/python/dask_cudf/dask_cudf/tests/test_distributed.py index 9bfc1c25c17..1b113aff125 100644 --- a/python/dask_cudf/dask_cudf/tests/test_distributed.py +++ b/python/dask_cudf/dask_cudf/tests/test_distributed.py @@ -16,7 +16,7 @@ dask_cuda = pytest.importorskip("dask_cuda") -def more_than_n_gpus(n): +def at_least_n_gpus(n): ngpus = len(numba.cuda.gpus) return ngpus >= n @@ -54,7 +54,7 @@ def test_merge(): @pytest.mark.skipif( - not more_than_n_gpus(2), reason="Machine does not have more than two GPUs" + not at_least_n_gpus(2), reason="Machine does not have two GPUs" ) def test_ucx_seriesgroupby(): pytest.importorskip("ucp") @@ -100,8 +100,8 @@ def test_p2p_shuffle(): @pytest.mark.skipif( - not more_than_n_gpus(3), - reason="Machine does not have more than three GPUs", + not at_least_n_gpus(3), + reason="Machine does not have three GPUs", ) def test_unique(): with dask_cuda.LocalCUDACluster(n_workers=3) as cluster: From 609b936494ef7b697f30e59da383efd6d0741d48 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Fri, 5 Apr 2024 15:59:42 -0700 Subject: [PATCH 3/3] update comment --- python/dask_cudf/dask_cudf/tests/test_distributed.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/dask_cudf/dask_cudf/tests/test_distributed.py b/python/dask_cudf/dask_cudf/tests/test_distributed.py index 1b113aff125..07fdb25dff9 100644 --- a/python/dask_cudf/dask_cudf/tests/test_distributed.py +++ b/python/dask_cudf/dask_cudf/tests/test_distributed.py @@ -104,6 +104,9 @@ def test_p2p_shuffle(): reason="Machine does not have three GPUs", ) def test_unique(): + # Using `"p2p"` can produce dispatching problems + # TODO: Test "p2p" after dask > 2024.4.1 is required + # See: https://github.com/dask/dask/pull/11040 with dask_cuda.LocalCUDACluster(n_workers=3) as cluster: with Client(cluster): df = cudf.DataFrame({"x": ["a", "b", "c", "a", "a"]})