diff --git a/AB_environments/AB_sample.cluster.yaml b/AB_environments/AB_sample.cluster.yaml index b9e7e1cebb..db799b4b61 100644 --- a/AB_environments/AB_sample.cluster.yaml +++ b/AB_environments/AB_sample.cluster.yaml @@ -9,6 +9,6 @@ # Overrides ../cluster_kwargs.yaml. # Leave empty if you don't want to override anything. -# small_cluster: +# small: # n_workers: 5 # worker_vm_types: [m6i.xlarge] # 4CPU, 16GiB diff --git a/cluster_kwargs.yaml b/cluster_kwargs.yaml index 65677d57e0..b48ea1b73f 100644 --- a/cluster_kwargs.yaml +++ b/cluster_kwargs.yaml @@ -20,18 +20,18 @@ default: spot_on_demand_fallback: true multizone: true -# For all tests using the small_client fixture -small_cluster: +# For all tests using the @pytest.mark.client("small") fixture +small: n_workers: 10 worker_vm_types: [m6i.large] # 2CPU, 8GiB # For tests/benchmarks/test_parquet.py -parquet_cluster: +parquet: n_workers: 15 worker_vm_types: [m5.xlarge] # 4 CPU, 16 GiB # For tests/benchmarks/test_spill.py -spill_cluster: +spill: n_workers: 5 worker_disk_size: 64 worker_vm_types: [m6i.large] # 2CPU, 8GiB @@ -72,6 +72,12 @@ snowflake: n_workers: 20 worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) +# For tests/workflows/test_from_csv_to_parquet.py +from_csv_to_parquet: + n_workers: 10 + worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) + backend_options: + region: "us-east-1" # Same region as dataset # Specific tests test_work_stealing_on_scaling_up: @@ -85,10 +91,3 @@ test_work_stealing_on_straggling_worker: test_repeated_merge_spill: n_workers: 20 worker_vm_types: [m6i.large] - -# For tests/workflows/test_from_csv_to_parquet.py -from_csv_to_parquet: - n_workers: 10 - worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance) - backend_options: - region: "us-east-1" # Same region as dataset diff --git a/tests/benchmarks/test_array.py b/tests/benchmarks/test_array.py index e2b5058044..9a9038d086 100644 --- a/tests/benchmarks/test_array.py +++ b/tests/benchmarks/test_array.py @@ -18,11 +18,12 @@ ) -def test_anom_mean(small_client, new_array): +@pytest.mark.client("small") +def test_anom_mean(client, new_array): """From https://github.com/dask/distributed/issues/2602#issuecomment-498718651""" xarray = pytest.importorskip("xarray") - memory = cluster_memory(small_client) # 76.66 GiB + memory = cluster_memory(client) # 76.66 GiB target_nbytes = memory // 2 data = new_array( scaled_array_shape(target_nbytes, ("x", "10MiB")), @@ -42,9 +43,10 @@ def test_anom_mean(small_client, new_array): anom = arr.groupby("day") - clim anom_mean = anom.mean(dim="time") - wait(anom_mean, small_client, 10 * 60) + wait(anom_mean, client, 10 * 60) +@pytest.mark.client("small") @pytest.mark.parametrize( "speed,chunk_shape", [ @@ -53,7 +55,7 @@ def test_anom_mean(small_client, new_array): ("slow", "square"), ], ) -def test_basic_sum(small_client, speed, chunk_shape): +def test_basic_sum(client, speed, chunk_shape): """From https://github.com/dask/distributed/pull/4864 n-step map-reduce: @@ -90,7 +92,7 @@ def test_basic_sum(small_client, speed, chunk_shape): else: chunks = (3350, 3925) # 100.32 MiB square-ish chunks - memory = cluster_memory(small_client) # 76.66 GiB + memory = cluster_memory(client) # 76.66 GiB target_nbytes = memory * 5 data = da.zeros( scaled_array_shape(target_nbytes, ("100MiB", "x")), @@ -111,17 +113,18 @@ def slow_map(x): result = da.sum(data, axis=1) - wait(result, small_client, 10 * 60) + wait(result, client, 10 * 60) @pytest.mark.skip( "fails in actual CI; see https://github.com/coiled/benchmarks/issues/253" ) -def test_climatic_mean(small_client, new_array): +@pytest.mark.client("small") +def test_climatic_mean(client, new_array): """From https://github.com/dask/distributed/issues/2602#issuecomment-535009454""" xarray = pytest.importorskip("xarray") - memory = cluster_memory(small_client) # 76.66 GiB + memory = cluster_memory(client) # 76.66 GiB target_nbytes = memory * 2 chunks = (1, 1, 96, 21, 90, 144) shape = (28, "x", 96, 21, 90, 144) @@ -138,11 +141,12 @@ def test_climatic_mean(small_client, new_array): # arr_clim = array.groupby("init_date.month").mean(dim="init_date") arr_clim = array.groupby("init_date").mean(dim="init_date") - wait(arr_clim, small_client, 15 * 60) + wait(arr_clim, client, 15 * 60) -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_quadratic_mean(small_client): +@pytest.mark.client("small") +@run_up_to_nthreads("small", 50, reason="fixed dataset") +def test_quadratic_mean(client): # https://github.com/pangeo-data/distributed-array-examples/issues/2 xr = pytest.importorskip("xarray") @@ -163,13 +167,14 @@ def test_quadratic_mean(small_client): quad["uv"] = ds.anom_u * ds.anom_v mean = quad.mean("time") # Mean is really small at this point so we can just fetch it - wait(mean, small_client, 15 * 60) + wait(mean, client, 15 * 60) -def test_vorticity(small_client, new_array): +@pytest.mark.client("small") +def test_vorticity(client, new_array): # From https://github.com/dask/distributed/issues/6571 - memory = cluster_memory(small_client) # 76.66 GiB + memory = cluster_memory(client) # 76.66 GiB target_nbytes = int(memory * 0.85) shape = scaled_array_shape(target_nbytes, (5000, 5000, "x")) @@ -209,12 +214,13 @@ def pad_rechunk(arr): vp = pad_rechunk(v) result = dx[..., None] * up - dy[..., None] * vp - wait(arr_to_devnull(result), small_client, 10 * 60) + wait(arr_to_devnull(result), client, 10 * 60) -def test_double_diff(small_client, new_array): +@pytest.mark.client("small") +def test_double_diff(client, new_array): # Variant of https://github.com/dask/distributed/issues/6597 - memory = cluster_memory(small_client) # 76.66 GiB + memory = cluster_memory(client) # 76.66 GiB # FIXME https://github.com/coiled/benchmarks/issues/564 # this algorithm is supposed to scale linearly! shape = scaled_array_shape_quadratic(memory, "76.66 GiB", ("x", "x")) @@ -224,24 +230,26 @@ def test_double_diff(small_client, new_array): print_size_info(memory, memory, a, b) diff = a[1:, 1:] - b[:-1, :-1] - wait(arr_to_devnull(diff), small_client, 10 * 60) + wait(arr_to_devnull(diff), client, 10 * 60) -def test_dot_product(small_client, new_array): +@pytest.mark.client("small") +def test_dot_product(client, new_array): """See also test_spill.py::test_dot_product_spill for variant that hits the spill threshold """ - memory = cluster_memory(small_client) # 76.66 GiB + memory = cluster_memory(client) # 76.66 GiB shape = scaled_array_shape_quadratic(memory // 17, "4.5 GiB", ("x", "x")) a = new_array(shape, chunks="128 MiB") print_size_info(memory, memory // 17, a) # Input 1: 4.51 GiB - 49 128.00 MiB chunks b = (a @ a.T).sum() - wait(b, small_client, 10 * 60) + wait(b, client, 10 * 60) -def test_map_overlap_sample(small_client, new_array): +@pytest.mark.client("small") +def test_map_overlap_sample(client, new_array): """ This is from Napari like workloads where they have large images and commonly use map_overlap. They care about rapid (sub-second) access to @@ -255,30 +263,34 @@ def test_map_overlap_sample(small_client, new_array): y[5000:5010, 5000:5010].compute() -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_rechunk_in_memory(small_client, configure_rechunking): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_rechunk_in_memory(client, configure_rechunking): rng = da.random.default_rng() x = rng.random((50000, 50000)) x.rechunk((50000, 20)).rechunk((20, 50000)).sum().compute() -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_rechunk_striping(small_client, configure_rechunking): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_rechunk_striping(client, configure_rechunking): rng = da.random.default_rng() x = rng.random((100_000, 100_000)) x.rechunk((100_000, 100)).rechunk((100, 100_000)).sum().compute() # ~76 MiB chunks -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_rechunk_swap_axes(small_client, configure_rechunking): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_rechunk_swap_axes(client, configure_rechunking): rng = da.random.default_rng() x = rng.random((100_000, 100_000), chunks=(100_000, 100)) x.rechunk((100, 100_000)).sum().compute() # ~76 MiB chunks -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") +@run_up_to_nthreads("small", 50, reason="fixed dataset") @pytest.mark.skip(reason="this runs forever") -def test_rechunk_out_of_memory(small_client, configure_rechunking): +@pytest.mark.client("small") +def test_rechunk_out_of_memory(client, configure_rechunking): rng = da.random.default_rng() x = rng.random((100000, 100000)) x.rechunk((50000, 20)).rechunk((20, 50000)).sum().compute() diff --git a/tests/benchmarks/test_csv.py b/tests/benchmarks/test_csv.py index 53b5975094..d38bd15ff4 100644 --- a/tests/benchmarks/test_csv.py +++ b/tests/benchmarks/test_csv.py @@ -1,11 +1,13 @@ import dask.dataframe as dd import pandas as pd +import pytest from ..utils_test import run_up_to_nthreads -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_csv_basic(small_client): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_csv_basic(client): ddf = dd.read_csv( "s3://coiled-runtime-ci/nyc-tlc/yellow_tripdata_2019_csv/yellow_tripdata_2019-*.csv", dtype={ diff --git a/tests/benchmarks/test_custom.py b/tests/benchmarks/test_custom.py index f2f6362e5e..17c90f83be 100644 --- a/tests/benchmarks/test_custom.py +++ b/tests/benchmarks/test_custom.py @@ -1,17 +1,17 @@ import random import time +import pytest from dask import delayed from dask.utils import parse_bytes from ..utils_test import wait -def test_jobqueue(small_client): +@pytest.mark.client("small") +def test_jobqueue(client): # Just using dask to run lots of embarrassingly-parallel CPU-bound tasks as fast as possible - nthreads = sum( - w["nthreads"] for w in small_client.scheduler_info()["workers"].values() - ) + nthreads = sum(w["nthreads"] for w in client.scheduler_info()["workers"].values()) max_runtime = 120 max_sleep = 3 n_tasks = round(max_runtime / max_sleep * nthreads) @@ -26,8 +26,4 @@ def task(i: int) -> int: tasks = [task(i) for i in range(n_tasks)] result = delayed(sum)(tasks) # just so we have a single object - wait( - result, - small_client, - max_runtime * 1.15, - ) + wait(result, client, max_runtime * 1.15) diff --git a/tests/benchmarks/test_dataframe.py b/tests/benchmarks/test_dataframe.py index 52e3187a7b..e7a1752d00 100644 --- a/tests/benchmarks/test_dataframe.py +++ b/tests/benchmarks/test_dataframe.py @@ -1,3 +1,4 @@ +import pytest from dask.sizeof import sizeof from dask.utils import format_bytes @@ -15,8 +16,9 @@ def print_dataframe_info(df): ) -def test_dataframe_align(small_client): - memory = cluster_memory(small_client) # 76.66 GiB +@pytest.mark.client("small") +def test_dataframe_align(client): + memory = cluster_memory(client) # 76.66 GiB df = timeseries_of_size( memory // 2, @@ -39,11 +41,12 @@ def test_dataframe_align(small_client): # ~25,488,000 rows x 100 columns, 19.18 GiB total, 354 55.48 MiB partitions final = (df2 - df).mean() # will be all NaN, just forcing alignment - wait(final, small_client, 10 * 60) + wait(final, client, 10 * 60) -def test_shuffle(small_client, configure_shuffling, memory_multiplier): - memory = cluster_memory(small_client) # 76.66 GiB +@pytest.mark.client("small") +def test_shuffle(client, configure_shuffling, memory_multiplier): + memory = cluster_memory(client) # 76.66 GiB df = timeseries_of_size( memory * memory_multiplier, @@ -57,20 +60,22 @@ def test_shuffle(small_client, configure_shuffling, memory_multiplier): shuf = df.shuffle("0").map_partitions(lambda x: x) result = shuf.size - wait(result, small_client, 20 * 60) + wait(result, client, 20 * 60) -def test_filter(small_client): +@pytest.mark.client("small") +def test_filter(client): """How fast can we filter a DataFrame?""" - memory = cluster_memory(small_client) + memory = cluster_memory(client) df = timeseries_of_size(memory) name = df.head(1).name.iloc[0] # Get first name that appears result = df[df.name == name] - wait(result, small_client, 10 * 60) + wait(result, client, 10 * 60) -def test_dataframe_cow_chain(small_client): - memory = cluster_memory(small_client) # 76.66 GiB +@pytest.mark.client("small") +def test_dataframe_cow_chain(client): + memory = cluster_memory(client) # 76.66 GiB df = timeseries_of_size( memory // 2, @@ -92,4 +97,4 @@ def test_dataframe_cow_chain(small_client): .astype({50: "float"}) .loc[:, slice(2, 100)] ) - wait(result, small_client, 10 * 60) + wait(result, client, 10 * 60) diff --git a/tests/benchmarks/test_futures.py b/tests/benchmarks/test_futures.py index ffc490836e..30f6f5c1cd 100644 --- a/tests/benchmarks/test_futures.py +++ b/tests/benchmarks/test_futures.py @@ -6,57 +6,61 @@ from ..utils_test import run_up_to_nthreads -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_single_future(small_client): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_single_future(client): """How quickly can we run a simple computation? Repeat the test a few times to get a more sensible cumulative measure. """ for i in range(100): - small_client.submit(inc, i).result() + client.submit(inc, i).result() -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") @pytest.mark.parametrize("rootish", ["rootish", "non-rootish"]) -def test_large_map(small_client, rootish): +def test_large_map(client, rootish): """What's the overhead of map these days?""" if rootish == "rootish": - futures = small_client.map(inc, range(100_000)) + futures = client.map(inc, range(100_000)) else: def inc_with_deps(i, deps): return i + 1 - deps = small_client.map(inc, range(5)) - futures = small_client.map(inc_with_deps, range(100_000), deps=deps) + deps = client.map(inc, range(5)) + futures = client.map(inc_with_deps, range(100_000), deps=deps) wait(futures) -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") +@run_up_to_nthreads("small", 50, reason="fixed dataset") @pytest.mark.skip( reason="Skip until https://github.com/coiled/benchmarks/issues/521 is fixed" ) -def test_large_map_first_work(small_client): +@pytest.mark.client("small") +def test_large_map_first_work(client): """ Large maps are fine, but it's pleasant to see work start immediately. We have a batch_size keyword that should work here but it's not on by default. Maybe it should be. """ - futures = small_client.map(inc, range(100_000)) + futures = client.map(inc, range(100_000)) for _ in as_completed(futures): return -@run_up_to_nthreads("small_cluster", 100, reason="fixed dataset") -def test_memory_efficient(small_client): +@run_up_to_nthreads("small", 100, reason="fixed dataset") +@pytest.mark.client("small") +def test_memory_efficient(client): """ We hope that we pipeline xs->ys->zs without keeping all of the xs in memory to start. This may not actually happen today. """ - xs = small_client.map(np.random.random, [20_000_000] * 100, pure=False) - ys = small_client.map(slowinc, xs, delay=1) - zs = small_client.map(slowdec, ys, delay=1) + xs = client.map(np.random.random, [20_000_000] * 100, pure=False) + ys = client.map(slowinc, xs, delay=1) + zs = client.map(slowdec, ys, delay=1) futures = as_completed(zs) del xs, ys, zs # Don't keep references to intermediate results diff --git a/tests/benchmarks/test_h2o.py b/tests/benchmarks/test_h2o.py index 58683ce07f..2eaf93ff26 100644 --- a/tests/benchmarks/test_h2o.py +++ b/tests/benchmarks/test_h2o.py @@ -38,7 +38,8 @@ @pytest.fixture(params=list(DATASETS)) -def ddf(request, small_client): +@pytest.mark.client("small") +def ddf(request, client): if request.param not in enabled_datasets: raise pytest.skip("Disabled by default config or H2O_DATASETS env variable") @@ -49,7 +50,7 @@ def ddf(request, small_client): # 500 GB -> 10,000 files max_threads = max(20, int(n_gib * 20)) run_up_to_nthreads( - "small_cluster", max_threads, reason="fixed data size", as_decorator=False + "small", max_threads, reason="fixed data size", as_decorator=False ) uri = DATASETS[request.param] diff --git a/tests/benchmarks/test_join.py b/tests/benchmarks/test_join.py index fecf038314..5b86121ccb 100644 --- a/tests/benchmarks/test_join.py +++ b/tests/benchmarks/test_join.py @@ -4,9 +4,10 @@ from ..utils_test import cluster_memory, run_up_to_nthreads, timeseries_of_size, wait -@run_up_to_nthreads("small_cluster", 40, reason="Does not finish") -def test_join_big(small_client, memory_multiplier, configure_shuffling): - memory = cluster_memory(small_client) # 76.66 GiB +@run_up_to_nthreads("small", 40, reason="Does not finish") +@pytest.mark.client("small") +def test_join_big(client, memory_multiplier, configure_shuffling): + memory = cluster_memory(client) # 76.66 GiB df1_big = timeseries_of_size( memory * memory_multiplier, dtypes={str(i): float for i in range(100)} @@ -24,14 +25,15 @@ def test_join_big(small_client, memory_multiplier, configure_shuffling): join = df1_big.merge(df2_big, on="predicate", how="inner") result = join.size - wait(result, small_client, 20 * 60) + wait(result, client, 20 * 60) -def test_join_big_small(small_client, memory_multiplier, configure_shuffling): +@pytest.mark.client("small") +def test_join_big_small(client, memory_multiplier, configure_shuffling): if memory_multiplier == 0.1: raise pytest.skip(reason="Too noisy; not adding anything to multiplier=1") - memory = cluster_memory(small_client) # 76.66 GiB + memory = cluster_memory(client) # 76.66 GiB df_big = timeseries_of_size( memory * memory_multiplier, dtypes={str(i): float for i in range(100)} @@ -50,12 +52,13 @@ def test_join_big_small(small_client, memory_multiplier, configure_shuffling): join = df_big.merge(df_small_pd, on="predicate", how="inner") result = join.size - wait(result, small_client, 20 * 60) + wait(result, client, 20 * 60) @pytest.mark.parametrize("persist", [True, False]) -def test_set_index(small_client, persist, memory_multiplier, configure_shuffling): - memory = cluster_memory(small_client) # 76.66 GiB +@pytest.mark.client("small") +def test_set_index(client, persist, memory_multiplier, configure_shuffling): + memory = cluster_memory(client) # 76.66 GiB df_big = timeseries_of_size( memory * memory_multiplier, dtypes={str(i): float for i in range(100)} @@ -65,7 +68,7 @@ def test_set_index(small_client, persist, memory_multiplier, configure_shuffling if persist: df_big = df_big.persist() df_indexed = df_big.set_index("0") - wait(df_indexed.size, small_client, 20 * 60) + wait(df_indexed.size, client, 20 * 60) @pytest.mark.client("uber_lyft_large") diff --git a/tests/benchmarks/test_parquet.py b/tests/benchmarks/test_parquet.py index 383078caea..1ebae03253 100644 --- a/tests/benchmarks/test_parquet.py +++ b/tests/benchmarks/test_parquet.py @@ -2,19 +2,15 @@ Parquet-related benchmarks. """ import io -import uuid import boto3 import dask.dataframe as dd import dask.datasets -import distributed import fsspec import pandas import pytest -from coiled import Cluster from packaging.version import Version -from ..conftest import dump_cluster_kwargs from ..utils_test import run_up_to_nthreads, wait try: @@ -25,37 +21,13 @@ HAS_PYARROW12 = False -@pytest.fixture(scope="module") -def parquet_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags): - kwargs = dict( - name=f"parquet-{uuid.uuid4().hex[:8]}", - environ=dask_env_variables, - tags=github_cluster_tags, - **cluster_kwargs["parquet_cluster"], - ) - dump_cluster_kwargs(kwargs, "parquet") - - with Cluster(**kwargs) as cluster: - yield cluster - - -@pytest.fixture -def parquet_client(parquet_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all): - n_workers = cluster_kwargs["parquet_cluster"]["n_workers"] - with distributed.Client(parquet_cluster) as client: - parquet_cluster.scale(n_workers) - client.wait_for_workers(n_workers) - client.restart() - with upload_cluster_dump(client), benchmark_all(client): - yield client - - @pytest.mark.xfail( HAS_PYARROW12, reason="50x slower than PyArrow 11; https://github.com/coiled/benchmarks/issues/998", ) -@run_up_to_nthreads("parquet_cluster", 100, reason="fixed dataset") -def test_read_spark_generated_data(parquet_client): +@run_up_to_nthreads("parquet", 100, reason="fixed dataset") +@pytest.mark.client("parquet") +def test_read_spark_generated_data(client): """ Read a ~15 GB subset of a ~800 GB spark-generated open dataset on AWS. @@ -70,11 +42,12 @@ def test_read_spark_generated_data(parquet_client): index="sample_id", ) coll = ddf.groupby(ddf.index).first() - wait(coll, parquet_client, 500) + wait(coll, client, 500) -@run_up_to_nthreads("parquet_cluster", 100, reason="fixed dataset") -def test_read_hive_partitioned_data(parquet_client): +@run_up_to_nthreads("parquet", 100, reason="fixed dataset") +@pytest.mark.client("parquet") +def test_read_hive_partitioned_data(client): """ Read a dataset partitioned by year and quarter. @@ -86,11 +59,12 @@ def test_read_hive_partitioned_data(parquet_client): engine="pyarrow", ) coll = ddf.groupby(["year", "quarter"]).first() - wait(coll, parquet_client, 100) + wait(coll, client, 100) -@run_up_to_nthreads("parquet_cluster", 100, reason="fixed dataset") -def test_write_wide_data(parquet_client, s3_url): +@run_up_to_nthreads("parquet", 100, reason="fixed dataset") +@pytest.mark.client("parquet") +def test_write_wide_data(client, s3_url): # Write a ~700 partition, ~200 GB dataset with a lot of columns ddf = dask.datasets.timeseries( dtypes={ @@ -107,9 +81,10 @@ def test_write_wide_data(parquet_client, s3_url): ddf.to_parquet(s3_url + "/wide-data/") -@run_up_to_nthreads("parquet_cluster", 60, reason="fixed dataset") +@run_up_to_nthreads("parquet", 60, reason="fixed dataset") @pytest.mark.parametrize("kind", ["boto3", "s3fs", "pandas", "pandas+boto3", "dask"]) -def test_download_throughput(parquet_client, kind): +@pytest.mark.client("parquet") +def test_download_throughput(client, kind): """Test throughput for downloading and parsing a single 563 MB parquet file. Note @@ -131,7 +106,7 @@ def boto3_load(path): return response["Body"].read() if kind == "boto3": - fut = parquet_client.submit(boto3_load, path) + fut = client.submit(boto3_load, path) elif kind == "s3fs": @@ -139,10 +114,10 @@ def load(path): with fsspec.open(path) as f: return f.read() - fut = parquet_client.submit(load, path) + fut = client.submit(load, path) elif kind == "pandas": - fut = parquet_client.submit(pandas.read_parquet, path, engine="pyarrow") + fut = client.submit(pandas.read_parquet, path, engine="pyarrow") elif kind == "pandas+boto3": @@ -151,9 +126,9 @@ def load(path): buf = io.BytesIO(raw) return pandas.read_parquet(buf, engine="pyarrow") - fut = parquet_client.submit(load, path) + fut = client.submit(load, path) elif kind == "dask": fut = dd.read_parquet(path, engine="pyarrow") - wait(fut, parquet_client, timeout=60) + wait(fut, client, timeout=60) diff --git a/tests/benchmarks/test_spill.py b/tests/benchmarks/test_spill.py index 204aa9a488..42dd6218f5 100644 --- a/tests/benchmarks/test_spill.py +++ b/tests/benchmarks/test_spill.py @@ -27,7 +27,7 @@ def spill_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags): }, ), tags=github_cluster_tags, - **cluster_kwargs["spill_cluster"], + **cluster_kwargs["spill"], ) dump_cluster_kwargs(kwargs, "spill") with Cluster(**kwargs) as cluster: @@ -36,7 +36,7 @@ def spill_cluster(dask_env_variables, cluster_kwargs, github_cluster_tags): @pytest.fixture def spill_client(spill_cluster, cluster_kwargs, upload_cluster_dump, benchmark_all): - n_workers = cluster_kwargs["spill_cluster"]["n_workers"] + n_workers = cluster_kwargs["spill"]["n_workers"] with Client(spill_cluster) as client: spill_cluster.scale(n_workers) client.wait_for_workers(n_workers) diff --git a/tests/benchmarks/test_work_stealing.py b/tests/benchmarks/test_work_stealing.py index a676ee39f6..057cd91aec 100644 --- a/tests/benchmarks/test_work_stealing.py +++ b/tests/benchmarks/test_work_stealing.py @@ -13,15 +13,16 @@ from ..utils_test import run_up_to_nthreads -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_trivial_workload_should_not_cause_work_stealing(small_client): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_trivial_workload_should_not_cause_work_stealing(client): root = delayed(lambda n: "x" * n)(utils.parse_bytes("1MiB"), dask_key_name="root") results = [delayed(lambda *args: None)(root, i) for i in range(10000)] - futs = small_client.compute(results) - small_client.gather(futs) + futs = client.compute(results) + client.gather(futs) -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") +@run_up_to_nthreads("small", 50, reason="fixed dataset") @pytest.mark.xfail( Version(distributed.__version__) < Version("2022.6.1"), reason="https://github.com/dask/distributed/issues/6624", @@ -71,8 +72,9 @@ def func2(chunk): _ = future.result() -@run_up_to_nthreads("small_cluster", 100, reason="fixed dataset") -def test_work_stealing_on_inhomogeneous_workload(small_client): +@run_up_to_nthreads("small", 100, reason="fixed dataset") +@pytest.mark.client("small") +def test_work_stealing_on_inhomogeneous_workload(client): np.random.seed(42) delays = np.random.lognormal(1, 1.3, 500) @@ -82,11 +84,11 @@ def clog(n): return n results = [clog(i) for i in delays] - futs = small_client.compute(results) - small_client.gather(futs) + futs = client.compute(results) + client.gather(futs) -@run_up_to_nthreads("small_cluster", 100, reason="fixed dataset") +@run_up_to_nthreads("small", 100, reason="fixed dataset") def test_work_stealing_on_straggling_worker( test_name_uuid, upload_cluster_dump, diff --git a/tests/benchmarks/test_xgboost.py b/tests/benchmarks/test_xgboost.py index f58071dbc7..a40ef1fd1b 100644 --- a/tests/benchmarks/test_xgboost.py +++ b/tests/benchmarks/test_xgboost.py @@ -27,8 +27,9 @@ def taxi_zone_lookup(): return df -@run_up_to_nthreads("small_cluster", 200, reason="fixed size dataset") -def test_preprocess(small_client, taxi_zone_lookup, read_parquet_with_pyarrow): +@run_up_to_nthreads("small", 200, reason="fixed size dataset") +@pytest.mark.client("small") +def test_preprocess(client, taxi_zone_lookup, read_parquet_with_pyarrow): """A typical workflow that preprocesses crude data into a ML-friendly dataframe""" ############ # Read input @@ -139,11 +140,12 @@ def test_preprocess(small_client, taxi_zone_lookup, read_parquet_with_pyarrow): ######## ddf = ddf.persist().repartition(partition_size="100MB") # At this point we would normally finish with to_parquet() - wait(ddf, small_client, timeout=600) + wait(ddf, client, timeout=600) -@run_up_to_nthreads("small_cluster", 200, reason="fixed size dataset") -def test_optuna_hpo(small_client): +@run_up_to_nthreads("small", 200, reason="fixed size dataset") +@pytest.mark.client("small") +def test_optuna_hpo(client): xgb = pytest.importorskip("xgboost.dask") optuna = pytest.importorskip("optuna") mean_squared_error = pytest.importorskip("dask_ml.metrics").mean_squared_error @@ -183,7 +185,7 @@ def test_optuna_hpo(small_client): y_test = test["trip_time"] # We will need to access these multiple times. Let's persist them. - x_test, y_test = small_client.persist([x_test, y_test]) + x_test, y_test = client.persist([x_test, y_test]) # Release no longer necessary objects on the cluster del ddf, train, test diff --git a/tests/benchmarks/test_zarr.py b/tests/benchmarks/test_zarr.py index 5ce265f84e..6f54fc8f02 100644 --- a/tests/benchmarks/test_zarr.py +++ b/tests/benchmarks/test_zarr.py @@ -28,31 +28,34 @@ def cmip6(): return xarray.open_dataset(store, engine="zarr", chunks={}) -@run_up_to_nthreads("small_cluster", 100, reason="fixed dataset") +@run_up_to_nthreads("small", 100, reason="fixed dataset") @pytest.mark.parametrize("threshold", [50, 100, 200, 255]) -def test_filter_then_average(small_client, zarr_dataset, threshold): +def test_filter_then_average(client, zarr_dataset, threshold): """Compute the mean for increasingly sparse boolean filters of an array""" a = zarr_dataset[zarr_dataset > threshold].mean() - wait(a, small_client, 300) + wait(a, client, 300) -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") @pytest.mark.parametrize("N", [700, 75, 1]) -def test_access_slices(small_client, zarr_dataset, N): +def test_access_slices(client, zarr_dataset, N): """Accessing just a few chunks of a zarr array should be quick""" a = zarr_dataset[:N, :N, :N] - wait(a, small_client, 300) + wait(a, client, 300) -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_sum_residuals(small_client, zarr_dataset): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_sum_residuals(client, zarr_dataset): """Compute reduce, then map, then reduce again""" a = (zarr_dataset - zarr_dataset.mean(axis=0)).sum() - wait(a, small_client, 300) + wait(a, client, 300) -@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset") -def test_select_scalar(small_client, cmip6): +@run_up_to_nthreads("small", 50, reason="fixed dataset") +@pytest.mark.client("small") +def test_select_scalar(client, cmip6): ds = cmip6.isel({"lat": 20, "lon": 40, "plev": 5, "time": 1234}).compute() assert ds.zg.shape == () assert ds.zg.size == 1 diff --git a/tests/conftest.py b/tests/conftest.py index e56085e069..9d727c777a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -152,6 +152,17 @@ def benchmark_db_session(benchmark_db_engine): yield session +def clean_test_name(name: str) -> str: + """Clean up redundant parameters created by 'client' fixture""" + return ( + name.replace("cluster0,client0", "") + .replace("[]", "") + .replace(",]", "]") + .replace("[,", "[") + .replace(",,", ",") + ) + + @pytest.fixture(scope="function") def test_run_benchmark(benchmark_db_session, request, testrun_uid): """SQLAlchemy ORM object representing a given test run. @@ -171,7 +182,7 @@ def test_run_benchmark(benchmark_db_session, request, testrun_uid): else: run = TestRun( session_id=testrun_uid, - name=request.node.name, + name=clean_test_name(request.node.name), originalname=request.node.originalname, path=str(request.node.path.relative_to(TEST_DIR)), dask_version=dask.__version__, @@ -456,17 +467,25 @@ def cluster_kwargs(): @pytest.fixture(scope="module") -def small_cluster(request, dask_env_variables, cluster_kwargs, github_cluster_tags): +def cluster(request, dask_env_variables, cluster_kwargs, github_cluster_tags): + name = request.param["name"] module = os.path.basename(request.fspath).split(".")[0] module = module.replace("test_", "") kwargs = dict( name=f"{module}-{uuid.uuid4().hex[:8]}", environ=dask_env_variables, tags=github_cluster_tags, - **cluster_kwargs["small_cluster"], + **cluster_kwargs[name], ) - dump_cluster_kwargs(kwargs, f"small_cluster.{module}") + dump_cluster_kwargs(kwargs, f"{name}.{module}") with Cluster(**kwargs) as cluster: + if request.param["upload_file"] or request.param["worker_plugin"]: + with Client(cluster) as client: + if request.param["upload_file"] is not None: + client.upload_file(request.param["upload_file"]) + if request.param["worker_plugin"] is not None: + client.register_worker_plugin(request.param["worker_plugin"]) + yield cluster @@ -477,20 +496,27 @@ def log_on_scheduler( @pytest.fixture -def small_client( +def client( request, testrun_uid, - small_cluster, + cluster, cluster_kwargs, upload_cluster_dump, benchmark_all, ): - n_workers = cluster_kwargs["small_cluster"]["n_workers"] - test_label = f"{request.node.name}, session_id={testrun_uid}" - with Client(small_cluster) as client: + n_workers = cluster_kwargs["small"]["n_workers"] + test_label = f"{clean_test_name(request.node.name)}, session_id={testrun_uid}" + with Client(cluster) as client: log_on_scheduler(client, "Starting client setup of %s", test_label) client.restart() - small_cluster.scale(n_workers) + + # Run connects to all workers once and to ensure they're up before we do + # something else. restart() can trigger a race condition that kills workers + # See https://github.com/dask/distributed/issues/7312 + # Can be removed after this issue is fixed. + client.run(lambda: None) + + cluster.scale(n_workers) client.wait_for_workers(n_workers) with upload_cluster_dump(client): @@ -506,48 +532,18 @@ def small_client( # benchmark_time, as it's beyond the scope of the test. log_on_scheduler(client, "Starting client teardown of %s", test_label) - client.restart() - # Run connects to all workers once and to ensure they're up before we do - # something else. With another call of restart when entering this - # fixture again, this can trigger a race condition that kills workers - # See https://github.com/dask/distributed/issues/7312 Can be removed - # after this issue is fixed. - client.run(lambda: None) - -@pytest.fixture -def client( - request, - dask_env_variables, - cluster_kwargs, - github_cluster_tags, - upload_cluster_dump, - benchmark_all, -): - name = request.param["name"] - with Cluster( - f"{name}-{uuid.uuid4().hex[:8]}", - environ=dask_env_variables, - tags=github_cluster_tags, - **cluster_kwargs[name], - ) as cluster: - with Client(cluster) as client: - if request.param["upload_file"] is not None: - client.upload_file(request.param["upload_file"]) - if request.param["worker_plugin"] is not None: - client.register_worker_plugin(request.param["worker_plugin"]) - with upload_cluster_dump(client), benchmark_all(client): - yield client +def _mark_cluster(name, *, upload_file=None, worker_plugin=None): + kwargs = {"name": name, "upload_file": upload_file, "worker_plugin": worker_plugin} + return pytest.mark.parametrize("cluster", [kwargs], indirect=True) def _mark_client(name, *, upload_file=None, worker_plugin=None): - return pytest.mark.parametrize( - "client", - [{"name": name, "upload_file": upload_file, "worker_plugin": worker_plugin}], - indirect=True, - ) + kwargs = {"name": name, "upload_file": upload_file, "worker_plugin": worker_plugin} + return pytest.mark.parametrize("cluster,client", [(kwargs, kwargs)], indirect=True) +pytest.mark.cluster = _mark_cluster pytest.mark.client = _mark_client @@ -642,7 +638,8 @@ def _upload_cluster_dump(client): if cluster_dump == "always" or (cluster_dump == "fail" and failed): dump_path = ( f"{s3_cluster_dump_url}/{client.cluster.name}/" - f"{test_run_benchmark.path.replace('/', '.')}.{request.node.name}" + f"{test_run_benchmark.path.replace('/', '.')}." + f"{clean_test_name(request.node.name)}" ) test_run_benchmark.cluster_dump_url = dump_path + ".msgpack.gz" logger.info( diff --git a/tests/runtime/test_cluster_creation.py b/tests/runtime/test_cluster_creation.py index b571dee696..81a2f30d4c 100644 --- a/tests/runtime/test_cluster_creation.py +++ b/tests/runtime/test_cluster_creation.py @@ -7,9 +7,8 @@ def test_default_cluster_spinup_time( benchmark_time, github_cluster_tags, get_cluster_info ): """Note: this test must be kept in a separate module from the tests that use the - small_cluster fixture (which has the scope=module) or its child small_client. - This prevents having the small_cluster sitting idle for 5+ minutes while this test - is running. + "small" client fixture. This prevents having the small cluster sitting idle for 5+ + minutes while this test is running. """ with benchmark_time: with Cluster( diff --git a/tests/runtime/test_coiled.py b/tests/runtime/test_coiled.py index 8f52cf63e5..49b20eeaae 100644 --- a/tests/runtime/test_coiled.py +++ b/tests/runtime/test_coiled.py @@ -1,8 +1,10 @@ +import pytest from coiled import Cluster -def test_cluster_reconnect(small_cluster, get_cluster_info, benchmark_time): +@pytest.mark.cluster("small") +def test_cluster_reconnect(cluster, get_cluster_info, benchmark_time): """How quickly can we reconnect to an existing cluster?""" - with get_cluster_info(small_cluster), benchmark_time: - with Cluster(name=small_cluster.name, shutdown_on_close=False): + with get_cluster_info(cluster), benchmark_time: + with Cluster(name=cluster.name, shutdown_on_close=False): pass diff --git a/tests/runtime/test_xgboost.py b/tests/runtime/test_xgboost.py index 2daadd78bb..a72efed3e8 100644 --- a/tests/runtime/test_xgboost.py +++ b/tests/runtime/test_xgboost.py @@ -5,7 +5,8 @@ xgb = pytest.importorskip("xgboost") -def test_xgboost_distributed_training(small_client): +@pytest.mark.client("small") +def test_xgboost_distributed_training(client): # `coiled-runtime=0.0.4` don't contain `dask_ml` dask_ml = pytest.importorskip("dask_ml") @@ -23,8 +24,8 @@ def test_xgboost_distributed_training(small_client): ) # Create the XGBoost DMatrix for our training and testing splits - dtrain = xgb.dask.DaskDMatrix(small_client, X_train, y_train) - dtest = xgb.dask.DaskDMatrix(small_client, X_test, y_test) + dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train) + dtest = xgb.dask.DaskDMatrix(client, X_test, y_test) # Set model parameters (XGBoost defaults) params = { @@ -36,9 +37,9 @@ def test_xgboost_distributed_training(small_client): "grow_policy": "depthwise", } output = xgb.dask.train( - small_client, params, dtrain, num_boost_round=5, evals=[(dtrain, "train")] + client, params, dtrain, num_boost_round=5, evals=[(dtrain, "train")] ) # make predictions - y_pred = xgb.dask.predict(small_client, output, dtest) + y_pred = xgb.dask.predict(client, output, dtest) assert y_pred.shape[0] == y_test.shape[0].compute() diff --git a/tests/stability/test_array.py b/tests/stability/test_array.py index ca8acf99da..2175784e75 100644 --- a/tests/stability/test_array.py +++ b/tests/stability/test_array.py @@ -21,9 +21,10 @@ sys.platform.startswith("win"), reason="scaled_array_shape fails on windows" ) @pytest.mark.skipif(not has_scipy, reason="requires scipy") -def test_ols(small_client): +@pytest.mark.client("small") +def test_ols(client): chunksize = int(1e6) - memory = cluster_memory(small_client) + memory = cluster_memory(client) target_nbytes = memory * 0.50 target_shape = scaled_array_shape(target_nbytes, ("x", 100)) num_samples, num_coeffs = target_shape[0], target_shape[-1] @@ -33,4 +34,4 @@ def test_ols(small_client): y = X @ beta + rng.normal(size=(num_samples,), chunks=(chunksize,)) beta_hat = da.linalg.solve(X.T @ X, X.T @ y) # normal eq'n y_hat = X @ beta_hat - wait(y_hat, small_client, 20 * 60) + wait(y_hat, client, 20 * 60)