Skip to content

Commit

Permalink
Centralize client fixture
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Sep 28, 2023
1 parent 70dd33f commit e9e90bb
Show file tree
Hide file tree
Showing 19 changed files with 231 additions and 227 deletions.
2 changes: 1 addition & 1 deletion AB_environments/AB_sample.cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
# Overrides ../cluster_kwargs.yaml.
# Leave empty if you don't want to override anything.

# small_cluster:
# small:
# n_workers: 5
# worker_vm_types: [m6i.xlarge] # 4CPU, 16GiB
21 changes: 10 additions & 11 deletions cluster_kwargs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ default:
spot_on_demand_fallback: true
multizone: true

# For all tests using the small_client fixture
small_cluster:
# For all tests using the @pytest.mark.client("small") fixture
small:
n_workers: 10
worker_vm_types: [m6i.large] # 2CPU, 8GiB

# For tests/benchmarks/test_parquet.py
parquet_cluster:
parquet:
n_workers: 15
worker_vm_types: [m5.xlarge] # 4 CPU, 16 GiB

# For tests/benchmarks/test_spill.py
spill_cluster:
spill:
n_workers: 5
worker_disk_size: 64
worker_vm_types: [m6i.large] # 2CPU, 8GiB
Expand Down Expand Up @@ -72,6 +72,12 @@ snowflake:
n_workers: 20
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)

# For tests/workflows/test_from_csv_to_parquet.py
from_csv_to_parquet:
n_workers: 10
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)
backend_options:
region: "us-east-1" # Same region as dataset

# Specific tests
test_work_stealing_on_scaling_up:
Expand All @@ -85,10 +91,3 @@ test_work_stealing_on_straggling_worker:
test_repeated_merge_spill:
n_workers: 20
worker_vm_types: [m6i.large]

# For tests/workflows/test_from_csv_to_parquet.py
from_csv_to_parquet:
n_workers: 10
worker_vm_types: [m6i.xlarge] # 4 CPU, 16 GiB (preferred default instance)
backend_options:
region: "us-east-1" # Same region as dataset
72 changes: 42 additions & 30 deletions tests/benchmarks/test_array.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
)


def test_anom_mean(small_client, new_array):
@pytest.mark.client("small")
def test_anom_mean(client, new_array):
"""From https://github.com/dask/distributed/issues/2602#issuecomment-498718651"""
xarray = pytest.importorskip("xarray")

memory = cluster_memory(small_client) # 76.66 GiB
memory = cluster_memory(client) # 76.66 GiB
target_nbytes = memory // 2
data = new_array(
scaled_array_shape(target_nbytes, ("x", "10MiB")),
Expand All @@ -42,9 +43,10 @@ def test_anom_mean(small_client, new_array):
anom = arr.groupby("day") - clim
anom_mean = anom.mean(dim="time")

wait(anom_mean, small_client, 10 * 60)
wait(anom_mean, client, 10 * 60)


@pytest.mark.client("small")
@pytest.mark.parametrize(
"speed,chunk_shape",
[
Expand All @@ -53,7 +55,7 @@ def test_anom_mean(small_client, new_array):
("slow", "square"),
],
)
def test_basic_sum(small_client, speed, chunk_shape):
def test_basic_sum(client, speed, chunk_shape):
"""From https://github.com/dask/distributed/pull/4864
n-step map-reduce:
Expand Down Expand Up @@ -90,7 +92,7 @@ def test_basic_sum(small_client, speed, chunk_shape):
else:
chunks = (3350, 3925) # 100.32 MiB square-ish chunks

memory = cluster_memory(small_client) # 76.66 GiB
memory = cluster_memory(client) # 76.66 GiB
target_nbytes = memory * 5
data = da.zeros(
scaled_array_shape(target_nbytes, ("100MiB", "x")),
Expand All @@ -111,17 +113,18 @@ def slow_map(x):

result = da.sum(data, axis=1)

wait(result, small_client, 10 * 60)
wait(result, client, 10 * 60)


@pytest.mark.skip(
"fails in actual CI; see https://github.com/coiled/benchmarks/issues/253"
)
def test_climatic_mean(small_client, new_array):
@pytest.mark.client("small")
def test_climatic_mean(client, new_array):
"""From https://github.com/dask/distributed/issues/2602#issuecomment-535009454"""
xarray = pytest.importorskip("xarray")

memory = cluster_memory(small_client) # 76.66 GiB
memory = cluster_memory(client) # 76.66 GiB
target_nbytes = memory * 2
chunks = (1, 1, 96, 21, 90, 144)
shape = (28, "x", 96, 21, 90, 144)
Expand All @@ -138,11 +141,12 @@ def test_climatic_mean(small_client, new_array):
# arr_clim = array.groupby("init_date.month").mean(dim="init_date")
arr_clim = array.groupby("init_date").mean(dim="init_date")

wait(arr_clim, small_client, 15 * 60)
wait(arr_clim, client, 15 * 60)


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
def test_quadratic_mean(small_client):
@pytest.mark.client("small")
@run_up_to_nthreads("small", 50, reason="fixed dataset")
def test_quadratic_mean(client):
# https://github.com/pangeo-data/distributed-array-examples/issues/2
xr = pytest.importorskip("xarray")

Expand All @@ -163,13 +167,14 @@ def test_quadratic_mean(small_client):
quad["uv"] = ds.anom_u * ds.anom_v
mean = quad.mean("time")
# Mean is really small at this point so we can just fetch it
wait(mean, small_client, 15 * 60)
wait(mean, client, 15 * 60)


def test_vorticity(small_client, new_array):
@pytest.mark.client("small")
def test_vorticity(client, new_array):
# From https://github.com/dask/distributed/issues/6571

memory = cluster_memory(small_client) # 76.66 GiB
memory = cluster_memory(client) # 76.66 GiB
target_nbytes = int(memory * 0.85)
shape = scaled_array_shape(target_nbytes, (5000, 5000, "x"))

Expand Down Expand Up @@ -209,12 +214,13 @@ def pad_rechunk(arr):
vp = pad_rechunk(v)
result = dx[..., None] * up - dy[..., None] * vp

wait(arr_to_devnull(result), small_client, 10 * 60)
wait(arr_to_devnull(result), client, 10 * 60)


def test_double_diff(small_client, new_array):
@pytest.mark.client("small")
def test_double_diff(client, new_array):
# Variant of https://github.com/dask/distributed/issues/6597
memory = cluster_memory(small_client) # 76.66 GiB
memory = cluster_memory(client) # 76.66 GiB
# FIXME https://github.com/coiled/benchmarks/issues/564
# this algorithm is supposed to scale linearly!
shape = scaled_array_shape_quadratic(memory, "76.66 GiB", ("x", "x"))
Expand All @@ -224,24 +230,26 @@ def test_double_diff(small_client, new_array):
print_size_info(memory, memory, a, b)

diff = a[1:, 1:] - b[:-1, :-1]
wait(arr_to_devnull(diff), small_client, 10 * 60)
wait(arr_to_devnull(diff), client, 10 * 60)


def test_dot_product(small_client, new_array):
@pytest.mark.client("small")
def test_dot_product(client, new_array):
"""See also test_spill.py::test_dot_product_spill
for variant that hits the spill threshold
"""
memory = cluster_memory(small_client) # 76.66 GiB
memory = cluster_memory(client) # 76.66 GiB
shape = scaled_array_shape_quadratic(memory // 17, "4.5 GiB", ("x", "x"))
a = new_array(shape, chunks="128 MiB")
print_size_info(memory, memory // 17, a)
# Input 1: 4.51 GiB - 49 128.00 MiB chunks

b = (a @ a.T).sum()
wait(b, small_client, 10 * 60)
wait(b, client, 10 * 60)


def test_map_overlap_sample(small_client, new_array):
@pytest.mark.client("small")
def test_map_overlap_sample(client, new_array):
"""
This is from Napari like workloads where they have large images and
commonly use map_overlap. They care about rapid (sub-second) access to
Expand All @@ -255,30 +263,34 @@ def test_map_overlap_sample(small_client, new_array):
y[5000:5010, 5000:5010].compute()


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
def test_rechunk_in_memory(small_client, configure_rechunking):
@run_up_to_nthreads("small", 50, reason="fixed dataset")
@pytest.mark.client("small")
def test_rechunk_in_memory(client, configure_rechunking):
rng = da.random.default_rng()
x = rng.random((50000, 50000))
x.rechunk((50000, 20)).rechunk((20, 50000)).sum().compute()


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
def test_rechunk_striping(small_client, configure_rechunking):
@run_up_to_nthreads("small", 50, reason="fixed dataset")
@pytest.mark.client("small")
def test_rechunk_striping(client, configure_rechunking):
rng = da.random.default_rng()
x = rng.random((100_000, 100_000))
x.rechunk((100_000, 100)).rechunk((100, 100_000)).sum().compute() # ~76 MiB chunks


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
def test_rechunk_swap_axes(small_client, configure_rechunking):
@run_up_to_nthreads("small", 50, reason="fixed dataset")
@pytest.mark.client("small")
def test_rechunk_swap_axes(client, configure_rechunking):
rng = da.random.default_rng()
x = rng.random((100_000, 100_000), chunks=(100_000, 100))
x.rechunk((100, 100_000)).sum().compute() # ~76 MiB chunks


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
@run_up_to_nthreads("small", 50, reason="fixed dataset")
@pytest.mark.skip(reason="this runs forever")
def test_rechunk_out_of_memory(small_client, configure_rechunking):
@pytest.mark.client("small")
def test_rechunk_out_of_memory(client, configure_rechunking):
rng = da.random.default_rng()
x = rng.random((100000, 100000))
x.rechunk((50000, 20)).rechunk((20, 50000)).sum().compute()
Expand Down
6 changes: 4 additions & 2 deletions tests/benchmarks/test_csv.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import dask.dataframe as dd
import pandas as pd
import pytest

from ..utils_test import run_up_to_nthreads


@run_up_to_nthreads("small_cluster", 50, reason="fixed dataset")
def test_csv_basic(small_client):
@run_up_to_nthreads("small", 50, reason="fixed dataset")
@pytest.mark.client("small")
def test_csv_basic(client):
ddf = dd.read_csv(
"s3://coiled-runtime-ci/nyc-tlc/yellow_tripdata_2019_csv/yellow_tripdata_2019-*.csv",
dtype={
Expand Down
14 changes: 5 additions & 9 deletions tests/benchmarks/test_custom.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import random
import time

import pytest
from dask import delayed
from dask.utils import parse_bytes

from ..utils_test import wait


def test_jobqueue(small_client):
@pytest.mark.client("small")
def test_jobqueue(client):
# Just using dask to run lots of embarrassingly-parallel CPU-bound tasks as fast as possible
nthreads = sum(
w["nthreads"] for w in small_client.scheduler_info()["workers"].values()
)
nthreads = sum(w["nthreads"] for w in client.scheduler_info()["workers"].values())
max_runtime = 120
max_sleep = 3
n_tasks = round(max_runtime / max_sleep * nthreads)
Expand All @@ -26,8 +26,4 @@ def task(i: int) -> int:
tasks = [task(i) for i in range(n_tasks)]
result = delayed(sum)(tasks) # just so we have a single object

wait(
result,
small_client,
max_runtime * 1.15,
)
wait(result, client, max_runtime * 1.15)
29 changes: 17 additions & 12 deletions tests/benchmarks/test_dataframe.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest
from dask.sizeof import sizeof
from dask.utils import format_bytes

Expand All @@ -15,8 +16,9 @@ def print_dataframe_info(df):
)


def test_dataframe_align(small_client):
memory = cluster_memory(small_client) # 76.66 GiB
@pytest.mark.client("small")
def test_dataframe_align(client):
memory = cluster_memory(client) # 76.66 GiB

df = timeseries_of_size(
memory // 2,
Expand All @@ -39,11 +41,12 @@ def test_dataframe_align(small_client):
# ~25,488,000 rows x 100 columns, 19.18 GiB total, 354 55.48 MiB partitions

final = (df2 - df).mean() # will be all NaN, just forcing alignment
wait(final, small_client, 10 * 60)
wait(final, client, 10 * 60)


def test_shuffle(small_client, configure_shuffling, memory_multiplier):
memory = cluster_memory(small_client) # 76.66 GiB
@pytest.mark.client("small")
def test_shuffle(client, configure_shuffling, memory_multiplier):
memory = cluster_memory(client) # 76.66 GiB

df = timeseries_of_size(
memory * memory_multiplier,
Expand All @@ -57,20 +60,22 @@ def test_shuffle(small_client, configure_shuffling, memory_multiplier):

shuf = df.shuffle("0").map_partitions(lambda x: x)
result = shuf.size
wait(result, small_client, 20 * 60)
wait(result, client, 20 * 60)


def test_filter(small_client):
@pytest.mark.client("small")
def test_filter(client):
"""How fast can we filter a DataFrame?"""
memory = cluster_memory(small_client)
memory = cluster_memory(client)
df = timeseries_of_size(memory)
name = df.head(1).name.iloc[0] # Get first name that appears
result = df[df.name == name]
wait(result, small_client, 10 * 60)
wait(result, client, 10 * 60)


def test_dataframe_cow_chain(small_client):
memory = cluster_memory(small_client) # 76.66 GiB
@pytest.mark.client("small")
def test_dataframe_cow_chain(client):
memory = cluster_memory(client) # 76.66 GiB

df = timeseries_of_size(
memory // 2,
Expand All @@ -92,4 +97,4 @@ def test_dataframe_cow_chain(small_client):
.astype({50: "float"})
.loc[:, slice(2, 100)]
)
wait(result, small_client, 10 * 60)
wait(result, client, 10 * 60)
Loading

0 comments on commit e9e90bb

Please sign in to comment.