diff --git a/datashader/compiler.py b/datashader/compiler.py index 4e89a4cc2..58cb2c84f 100644 --- a/datashader/compiler.py +++ b/datashader/compiler.py @@ -140,7 +140,7 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti create = make_create(bases, dshapes, cuda) append, any_uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias) - info = make_info(cols, any_uses_cuda_mutex) + info = make_info(cols, cuda, any_uses_cuda_mutex) combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned) finalize = make_finalize(bases, agg, schema, cuda, partitioned) @@ -302,9 +302,9 @@ def make_create(bases, dshapes, cuda): return lambda shape: tuple(c(shape, array_module) for c in creators) -def make_info(cols, uses_cuda_mutex: bool): +def make_info(cols, cuda, uses_cuda_mutex: bool): def info(df, canvas_shape): - ret = tuple(c.apply(df) for c in cols) + ret = tuple(c.apply(df, cuda) for c in cols) if uses_cuda_mutex: import cupy # Guaranteed to be available if uses_cuda_mutex is True import numba diff --git a/datashader/core.py b/datashader/core.py index b7b9c5cfc..aa7119614 100644 --- a/datashader/core.py +++ b/datashader/core.py @@ -345,7 +345,7 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None, """ from .glyphs import (LineAxis0, LinesAxis1, LinesAxis1XConstant, LinesAxis1YConstant, LineAxis0Multi, - LinesAxis1Ragged, LineAxis1Geometry) + LinesAxis1Ragged, LineAxis1Geometry, LinesXarrayCommonX) validate_xy_or_geometry('Line', x, y, geometry) @@ -383,6 +383,20 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None, "dask_geopandas.GeoDataFrame. Received objects of type {typ}".format( typ=type(source))) + elif isinstance(source, Dataset) and isinstance(x, str) and isinstance(y, str): + x_arr = source[x] + y_arr = source[y] + if x_arr.ndim != 1: + raise ValueError(f"x array must have 1 dimension not {x_arr.ndim}") + + if y_arr.ndim != 2: + raise ValueError(f"y array must have 2 dimensions not {y_arr.ndim}") + if x not in y_arr.dims: + raise ValueError("x must be one of the coordinate dimensions of y") + + y_coord_dims = list(y_arr.coords.dims) + x_dim_index = y_coord_dims.index(x) + glyph = LinesXarrayCommonX(x, y, x_dim_index) else: # Broadcast column specifications to handle cases where # x is a list and y is a string or vice versa diff --git a/datashader/data_libraries/dask_xarray.py b/datashader/data_libraries/dask_xarray.py index 0062a3f61..d062a1a7a 100644 --- a/datashader/data_libraries/dask_xarray.py +++ b/datashader/data_libraries/dask_xarray.py @@ -1,5 +1,6 @@ from datashader.compiler import compile_components from datashader.utils import Dispatcher +from datashader.glyphs.line import LinesXarrayCommonX from datashader.glyphs.quadmesh import ( QuadMeshRaster, QuadMeshRectilinear, QuadMeshCurvilinear, build_scale_translate ) @@ -349,6 +350,82 @@ def chunk(np_zs, np_x_centers, np_y_centers): return dsk, result_name +def dask_xarray_lines( + glyph: LinesXarrayCommonX, xr_ds: xr.Dataset, schema, canvas, summary, + *, antialias=False, cuda=False, +): + shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph) + + # Compile functions + create, info, append, combine, finalize, antialias_stage_2, antialias_stage_2_funcs, \ + column_names = compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda, + partitioned=True) + x_mapper = canvas.x_axis.mapper + y_mapper = canvas.y_axis.mapper + extend = glyph._build_extend( + x_mapper, y_mapper, info, append, antialias_stage_2, antialias_stage_2_funcs) + x_range = bounds[:2] + y_range = bounds[2:] + + x_name = glyph.x + x_dim_index = glyph.x_dim_index + other_dim_index = 1 - x_dim_index + other_dim_name = xr_ds[glyph.y].coords.dims[other_dim_index] + xs = xr_ds[x_name] + + # Build chunk offsets for coordinates + chunk_offsets = {} + for k, chunks in xr_ds.chunks.items(): + chunk_offsets[k] = [0] + list(np.cumsum(chunks)) + + partitioned = True + uses_row_index = summary.uses_row_index(cuda, partitioned) + + def chunk(np_array, *chunk_indices): + aggs = create(shape) + + start_x_index = chunk_offsets[x_name][chunk_indices[x_dim_index]] + end_x_index = start_x_index + np_array.shape[x_dim_index] + x = xs[start_x_index:end_x_index].values + + start_other_index = chunk_offsets[other_dim_name][chunk_indices[other_dim_index]] + end_other_index = start_other_index + np_array.shape[other_dim_index] + + data_vars = dict( + name=(("x", other_dim_name) if x_dim_index == 0 else (other_dim_name, "x"), np_array), + ) + # Other required columns are chunked in the other_dim + for column_name in column_names: + values = xr_ds[column_name][start_other_index:end_other_index].values + data_vars[column_name] = (other_dim_name, values) + + chunk_ds = xr.Dataset( + data_vars=data_vars, + coords=dict( + x=("x", x), + other_dim_name=(other_dim_name, np.arange(start_other_index, end_other_index)), + ), + ) + + if uses_row_index: + row_offset = start_other_index + chunk_ds.attrs["_datashader_row_offset"] = row_offset + chunk_ds.attrs["_datashader_row_length"] = end_other_index - start_other_index + + extend(aggs, chunk_ds, st, bounds) + return aggs + + name = tokenize(xr_ds.__dask_tokenize__(), canvas, glyph, summary) + keys = [k for row in xr_ds.__dask_keys__()[0] for k in row] + keys2 = [(name, i) for i in range(len(keys))] + dsk = dict((k2, (chunk, k, k[1], k[2])) for (k2, k) in zip(keys2, keys)) + dsk[name] = (apply, finalize, [(combine, keys2)], + dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label], + attrs=dict(x_range=x_range, y_range=y_range))) + return dsk, name + + dask_glyph_dispatch.register(QuadMeshRectilinear)(dask_rectilinear) dask_glyph_dispatch.register(QuadMeshRaster)(dask_raster) dask_glyph_dispatch.register(QuadMeshCurvilinear)(dask_curvilinear) +dask_glyph_dispatch.register(LinesXarrayCommonX)(dask_xarray_lines) diff --git a/datashader/data_libraries/pandas.py b/datashader/data_libraries/pandas.py index d52d8f39e..f93ab5320 100644 --- a/datashader/data_libraries/pandas.py +++ b/datashader/data_libraries/pandas.py @@ -6,6 +6,7 @@ from datashader.compiler import compile_components from datashader.glyphs.points import _PointLike, _GeometryLike from datashader.glyphs.area import _AreaToLineLike +from datashader.glyphs.line import LinesXarrayCommonX from datashader.utils import Dispatcher __all__ = () @@ -46,6 +47,15 @@ def default(glyph, source, schema, canvas, summary, *, antialias=False, cuda=Fal bases = create((height, width)) + if isinstance(glyph, LinesXarrayCommonX) and summary.uses_row_index(cuda, partitioned=False): + # Need to use a row index and extract.apply() doesn't have enough + # information to determine the coordinate length itself so do so here + # and pass it along as an xarray attribute in the usual manner. + other_dim_index = 1 - glyph.x_dim_index + other_dim_name = source[glyph.y].coords.dims[other_dim_index] + length = len(source[other_dim_name]) + source = source.assign_attrs(_datashader_row_offset=0, _datashader_row_length=length) + extend(bases, source, x_st + y_st, x_range + y_range) return finalize(bases, diff --git a/datashader/data_libraries/xarray.py b/datashader/data_libraries/xarray.py index dd496500b..ca4a78680 100644 --- a/datashader/data_libraries/xarray.py +++ b/datashader/data_libraries/xarray.py @@ -1,4 +1,5 @@ from __future__ import annotations +from datashader.glyphs.line import LinesXarrayCommonX from datashader.glyphs.quadmesh import _QuadMeshLike from datashader.data_libraries.pandas import default from datashader.core import bypixel @@ -16,7 +17,13 @@ @bypixel.pipeline.register(xr.Dataset) def xarray_pipeline(xr_ds, schema, canvas, glyph, summary, *, antialias=False): - cuda = cupy and isinstance(xr_ds[glyph.name].data, cupy.ndarray) + cuda = False + if cupy: + if isinstance(glyph, LinesXarrayCommonX): + cuda = isinstance(xr_ds[glyph.y].data, cupy.ndarray) + else: + cuda = isinstance(xr_ds[glyph.name].data, cupy.ndarray) + if not xr_ds.chunks: return glyph_dispatch( glyph, xr_ds, schema, canvas, summary, antialias=antialias, cuda=cuda) @@ -28,3 +35,4 @@ def xarray_pipeline(xr_ds, schema, canvas, glyph, summary, *, antialias=False): # Default to default pandas implementation glyph_dispatch.register(_QuadMeshLike)(default) +glyph_dispatch.register(LinesXarrayCommonX)(default) diff --git a/datashader/glyphs/__init__.py b/datashader/glyphs/__init__.py index 2ec5df5a4..63b8cc013 100644 --- a/datashader/glyphs/__init__.py +++ b/datashader/glyphs/__init__.py @@ -9,6 +9,7 @@ LinesAxis1Ragged, LineAxis1Geometry, LineAxis1GeoPandas, + LinesXarrayCommonX, ) from .area import ( # noqa (API import) AreaToZeroAxis0, diff --git a/datashader/glyphs/glyph.py b/datashader/glyphs/glyph.py index 34f3176f9..4824a7bb4 100644 --- a/datashader/glyphs/glyph.py +++ b/datashader/glyphs/glyph.py @@ -7,6 +7,7 @@ import numpy as np import pandas as pd +import xarray as xr from datashader.utils import Expr, ngjit from datashader.macros import expand_varargs @@ -54,6 +55,11 @@ def _compute_bounds(s): return (s.min(), s.max()) elif isinstance(s, pd.Series): return Glyph._compute_bounds_numba(s.values) + elif isinstance(s, xr.DataArray): + if cp and isinstance(s.data, cp.ndarray): + return (s.min().item(), s.max().item()) + else: + return Glyph._compute_bounds_numba(s.values.ravel()) else: return Glyph._compute_bounds_numba(s) diff --git a/datashader/glyphs/line.py b/datashader/glyphs/line.py index 48d72c323..6b7be92be 100644 --- a/datashader/glyphs/line.py +++ b/datashader/glyphs/line.py @@ -16,6 +16,7 @@ from ..transfer_functions._cuda_utils import cuda_args except ImportError: cudf = None + cp = None cuda_args = None try: @@ -607,7 +608,78 @@ def extend(aggs, df, vt, bounds, plot_start=True): perform_extend_cpu( sx, tx, sy, ty, xmin, xmax, ymin, ymax, - geom_array, antialias_stage_2, *aggs_and_cols + geom_array, antialias_stage_2, *aggs_and_cols, + ) + + return extend + + +class LinesXarrayCommonX(LinesAxis1): + def __init__(self, x, y, x_dim_index: int): + super().__init__(x, y) + self.x_dim_index = x_dim_index + + def __hash__(self): + # This ensures that @memoize below caches different functions for different x_dim_index. + return hash((type(self), self.x_dim_index)) + + def compute_x_bounds(self, dataset): + bounds = self._compute_bounds(dataset[self.x]) + return self.maybe_expand_bounds(bounds) + + def compute_y_bounds(self, dataset): + bounds = self._compute_bounds(dataset[self.y]) + return self.maybe_expand_bounds(bounds) + + def compute_bounds_dask(self, xr_ds): + return self.compute_x_bounds(xr_ds), self.compute_y_bounds(xr_ds) + + def validate(self, in_dshape): + if not isreal(in_dshape.measure[str(self.x)]): + raise ValueError('x column must be real') + + if not isreal(in_dshape.measure[str(self.y)]): + raise ValueError('y column must be real') + + @memoize + def _internal_build_extend( + self, x_mapper, y_mapper, info, append, line_width, antialias_stage_2, + antialias_stage_2_funcs, + ): + expand_aggs_and_cols = self.expand_aggs_and_cols(append) + draw_segment, antialias_stage_2_funcs = _line_internal_build_extend( + x_mapper, y_mapper, append, line_width, antialias_stage_2, antialias_stage_2_funcs, + expand_aggs_and_cols, + ) + swap_dims = self.x_dim_index == 0 + extend_cpu, extend_cuda = _build_extend_line_axis1_x_constant( + draw_segment, expand_aggs_and_cols, antialias_stage_2_funcs, swap_dims, + ) + + x_name = self.x + y_name = self.y + + def extend(aggs, df, vt, bounds, plot_start=True): + sx, tx, sy, ty = vt + xmin, xmax, ymin, ymax = bounds + aggs_and_cols = aggs + info(df, aggs[0].shape[:2]) + + if cudf and isinstance(df, cudf.DataFrame): + xs = cp.asarray(df[x_name]) + ys = cp.asarray(df[y_name]) + do_extend = extend_cuda[cuda_args(ys.shape)] + elif cp and isinstance(df[y_name].data, cp.ndarray): + xs = cp.asarray(df[x_name]) + ys = df[y_name].data + shape = ys.shape[::-1] if swap_dims else ys.shape + do_extend = extend_cuda[cuda_args(shape)] + else: + xs = df[x_name].to_numpy() + ys = df[y_name].to_numpy() + do_extend = extend_cpu + + do_extend( + sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2, *aggs_and_cols ) return extend @@ -1263,7 +1335,7 @@ def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_ def _build_extend_line_axis1_x_constant(draw_segment, expand_aggs_and_cols, - antialias_stage_2_funcs): + antialias_stage_2_funcs, swap_dims: bool = False): if antialias_stage_2_funcs is not None: aa_stage_2_accumulate, aa_stage_2_clear, aa_stage_2_copy_back = antialias_stage_2_funcs use_2_stage_agg = antialias_stage_2_funcs is not None @@ -1274,22 +1346,24 @@ def perform_extend_line( i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols ): x0 = xs[j] - y0 = ys[i, j] x1 = xs[j + 1] - y1 = ys[i, j + 1] - - segment_start = ( - (j == 0) or isnull(xs[j - 1]) or isnull(ys[i, j - 1]) - ) - - segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[i, j+2]) + if swap_dims: + y0 = ys[j, i] + y1 = ys[j + 1, i] + segment_start = (j == 0) or isnull(xs[j - 1]) or isnull(ys[j - 1, i]) + segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[j+2, i]) + else: + y0 = ys[i, j] + y1 = ys[i, j + 1] + segment_start = (j == 0) or isnull(xs[j - 1]) or isnull(ys[i, j - 1]) + segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[i, j+2]) if segment_start or use_2_stage_agg: xm = 0.0 ym = 0.0 else: xm = xs[j-1] - ym = ys[i, j-1] + ym = ys[j-1, i] if swap_dims else ys[i, j-1] draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax, segment_start, segment_end, x0, x1, y0, y1, @@ -1301,8 +1375,8 @@ def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2 *aggs_and_cols): antialias = antialias_stage_2 is not None buffer = np.empty(8) if antialias else None - ncols = ys.shape[1] - for i in range(ys.shape[0]): + ncols, nrows = ys.shape if swap_dims else ys.shape[::-1] + for i in range(nrows): for j in range(ncols - 1): perform_extend_line( i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols @@ -1347,10 +1421,10 @@ def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_ antialias = antialias_stage_2 is not None buffer = cuda.local.array(8, nb_types.float64) if antialias else None i, j = cuda.grid(2) - if i < ys.shape[0] and j < ys.shape[1] - 1: + ncols, nrows = ys.shape if swap_dims else ys.shape[::-1] + if i < nrows and j < ncols - 1: perform_extend_line( - i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, - buffer, *aggs_and_cols + i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols ) if use_2_stage_agg: diff --git a/datashader/reductions.py b/datashader/reductions.py index 042953be0..a41900978 100644 --- a/datashader/reductions.py +++ b/datashader/reductions.py @@ -84,14 +84,20 @@ def nan_check_column(self): class extract(Preprocess): """Extract a column from a dataframe as a numpy array of values.""" - def apply(self, df): + def apply(self, df, cuda): if self.column is SpecialColumn.RowIndex: - attrs = getattr(df, "attrs", None) - row_offset = getattr(attrs or df, "_datashader_row_offset", 0) + attr_name = "_datashader_row_offset" + if isinstance(df, xr.Dataset): + row_offset = df.attrs[attr_name] + row_length = df.attrs["_datashader_row_length"] + else: + attrs = getattr(df, "attrs", None) + row_offset = getattr(attrs or df, attr_name, 0) + row_length = len(df) if cudf and isinstance(df, cudf.DataFrame): if self.column is SpecialColumn.RowIndex: - return cp.arange(row_offset, row_offset+len(df), dtype=np.int64) + return cp.arange(row_offset, row_offset + row_length, dtype=np.int64) if df[self.column].dtype.kind == 'f': nullval = np.nan @@ -100,11 +106,16 @@ def apply(self, df): if Version(cudf.__version__) >= Version("22.02"): return df[self.column].to_cupy(na_value=nullval) return cp.array(df[self.column].to_gpu_array(fillna=nullval)) - elif isinstance(df, xr.Dataset): - # DataArray could be backed by numpy or cupy array - return df[self.column].data elif self.column is SpecialColumn.RowIndex: - return np.arange(row_offset, row_offset+len(df), dtype=np.int64) + if cuda: + return cp.arange(row_offset, row_offset + row_length, dtype=np.int64) + else: + return np.arange(row_offset, row_offset + row_length, dtype=np.int64) + elif isinstance(df, xr.Dataset): + if cuda and not isinstance(df[self.column].data, cp.ndarray): + return cp.asarray(df[self.column]) + else: + return df[self.column].data else: return df[self.column].values @@ -124,7 +135,7 @@ def validate(self, in_dshape): """Validates input shape""" raise NotImplementedError("validate not implemented") - def apply(self, df): + def apply(self, df, cuda): """Applies preprocessor to DataFrame and returns array""" raise NotImplementedError("apply not implemented") @@ -149,7 +160,7 @@ def validate(self, in_dshape): if not isinstance(in_dshape.measure[self.column], ct.Categorical): raise ValueError("input must be categorical") - def apply(self, df): + def apply(self, df, cuda): if cudf and isinstance(df, cudf.DataFrame): if Version(cudf.__version__) >= Version("22.02"): return df[self.column].cat.codes.to_cupy() @@ -185,7 +196,7 @@ def validate(self, in_dshape): if in_dshape.measure[self.column] not in self.IntegerTypes: raise ValueError("input must be an integer column") - def apply(self, df): + def apply(self, df, cuda): result = (df[self.column] - self.offset) % self.modulo if cudf and isinstance(df, cudf.Series): if Version(cudf.__version__) >= Version("22.02"): @@ -226,7 +237,7 @@ def validate(self, in_dshape): if self.column not in in_dshape.dict: raise ValueError("specified column not found") - def apply(self, df): + def apply(self, df, cuda): if cudf and isinstance(df, cudf.DataFrame): if Version(cudf.__version__) >= Version("22.02"): values = df[self.column].to_cupy(na_value=cp.nan) @@ -269,8 +280,8 @@ def categories(self, input_dshape): def validate(self, in_dshape): return self.categorizer.validate(in_dshape) - def apply(self, df): - a = self.categorizer.apply(df) + def apply(self, df, cuda): + a = self.categorizer.apply(df, cuda) if cudf and isinstance(df, cudf.DataFrame): import cupy if self.column == SpecialColumn.RowIndex: @@ -281,7 +292,7 @@ def apply(self, df): nullval = 0 a = cupy.asarray(a) if self.column == SpecialColumn.RowIndex: - b = extract(SpecialColumn.RowIndex).apply(df) + b = extract(SpecialColumn.RowIndex).apply(df, cuda) elif Version(cudf.__version__) >= Version("22.02"): b = df[self.column].to_cupy(na_value=nullval) else: @@ -289,7 +300,7 @@ def apply(self, df): return cupy.stack((a, b), axis=-1) else: if self.column == SpecialColumn.RowIndex: - b = extract(SpecialColumn.RowIndex).apply(df) + b = extract(SpecialColumn.RowIndex).apply(df, cuda) else: b = df[self.column].values return np.stack((a, b), axis=-1) @@ -2023,11 +2034,13 @@ def wrapped_combine(aggs, selector_aggs): if len(aggs) == 1: pass elif cuda: + assert len(aggs) == 2 is_n_reduction = isinstance(self.selector, FloatingNReduction) shape = aggs[0].shape[:-1] if is_n_reduction else aggs[0].shape combine[cuda_args(shape)](aggs, selector_aggs) else: - combine(aggs, selector_aggs) + for i in range(1, len(aggs)): + combine((aggs[0], aggs[i]), (selector_aggs[0], selector_aggs[i])) return aggs[0], selector_aggs[0] @@ -2182,10 +2195,11 @@ def _append_cuda(x, y, agg, field): def _combine(aggs): # Maximum ignoring -1 values # Works for CPU and GPU - if len(aggs) > 1: + ret = aggs[0] + for i in range(1, len(aggs)): # Works with numpy or cupy arrays - np.maximum(aggs[0], aggs[1], out=aggs[0]) - return aggs[0] + np.maximum(ret, aggs[i], out=ret) + return ret class _min_row_index(_max_or_min_row_index): @@ -2245,9 +2259,9 @@ def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = Fal def _combine(aggs): # Minimum ignoring -1 values ret = aggs[0] - if len(aggs) > 1: + for i in range(1, len(aggs)): # Can take 2d (ny, nx) or 3d (ny, nx, ncat) arrays. - row_min_in_place(aggs[0], aggs[1]) + row_min_in_place(ret, aggs[i]) return ret @staticmethod diff --git a/datashader/tests/test_xarray.py b/datashader/tests/test_xarray.py index 26b59e1d4..8397d1bc2 100644 --- a/datashader/tests/test_xarray.py +++ b/datashader/tests/test_xarray.py @@ -1,11 +1,21 @@ from __future__ import annotations import numpy as np +from numpy import nan +import os import xarray as xr import datashader as ds +from datashader.tests.test_pandas import assert_eq_ndarray import pytest +try: + import cupy +except ImportError: + cupy = None + +test_gpu = bool(int(os.getenv("DATASHADER_TEST_GPU", 0))) + xda = xr.DataArray(data=np.array(([1.] * 10 + [10] * 10)), dims=('record'), @@ -54,3 +64,150 @@ def test_count(source): assert_eq(agg, out) np.testing.assert_array_almost_equal(agg.x_range, (0, 1)) np.testing.assert_array_almost_equal(agg.y_range, (0, 1)) + + +x = np.arange(5) +channel = np.arange(2) +value = [-33, -55] +other = [2.2, 1.1] +data = np.array([[2, 1, 0, 1, 2], [1, 1, 1, 1, 1]], dtype=np.float64) +ds2d_x0 = xr.Dataset( + data_vars=dict( + name=(("x", "channel"), data.T.copy()), + value=("channel", value), + other=("channel", other), + ), + coords=dict( + channel=("channel", channel), + x=("x", x), + ), +) +ds2d_x1 = xr.Dataset( + data_vars=dict( + name=(("channel", "x"), data), + value=("channel", value), + other=("channel", other), + ), + coords=dict( + channel=("channel", channel), + x=("x", x), + ), +) +ds2ds = [ds2d_x0, ds2d_x1] + + +@pytest.mark.parametrize("ds2d", ds2ds) +@pytest.mark.parametrize("cuda", [False, True]) +@pytest.mark.parametrize("chunksizes", [ + None, + dict(x=10, channel=10), + dict(x=10, channel=1), + dict(x=3, channel=10), + dict(x=3, channel=1), +]) +def test_lines_xarray_common_x(ds2d, cuda, chunksizes): + source = ds2d.copy() + if cuda: + if not (cupy and test_gpu): + pytest.skip("CUDA tests not requested") + elif chunksizes is not None: + pytest.skip("CUDA-dask for LinesXarrayCommonX not implemented") + + # CPU -> GPU + source.name.data = cupy.asarray(source.name.data) + + if chunksizes is not None: + source = source.chunk(chunksizes) + + canvas = ds.Canvas(plot_height=3, plot_width=7) + + # Expected solutions + sol_count = np.array( + [[0, 0, 1, 1, 0, 0, 0], [1, 2, 1, 1, 2, 2, 1], [1, 0, 0, 0, 0, 0, 1]], + dtype=np.uint32) + sol_max = np.array( + [[nan, nan, -33, -33, nan, nan, nan], [-55, -33, -55, -55, -33, -33, -55], [-33, nan, nan, nan, nan, nan, -33]], # noqa: E501 + dtype=np.float64) + sol_min = np.array( + [[nan, nan, -33, -33, nan, nan, nan], [-55, -55, -55, -55, -55, -55, -55], [-33, nan, nan, nan, nan, nan, -33]], # noqa: E501 + dtype=np.float64) + sol_sum = np.array( + [[nan, nan, -33, -33, nan, nan, nan], [-55, -88, -55, -55, -88, -88, -55], [-33, nan, nan, nan, nan, nan, -33]], # noqa: E501 + dtype=np.float64) + sol_max_row_index = np.array( + [[-1, -1, 0, 0, -1, -1, -1], [1, 1, 1, 1, 1, 1, 1], [0, -1, -1, -1, -1, -1, 0]], + dtype=np.int64) + sol_min_row_index = np.array( + [[-1, -1, 0, 0, -1, -1, -1], [1, 0, 1, 1, 0, 0, 1], [0, -1, -1, -1, -1, -1, 0]], + dtype=np.int64) + + if chunksizes is not None and chunksizes["x"] == 3: + # Dask chunking in x-direction gives different (incorrect) results. + sol_count[:, 4] = 0 + sol_max[:, 4] = nan + sol_min[:, 4] = nan + sol_sum[:, 4] = nan + sol_max_row_index[:, 4] = -1 + sol_min_row_index[:, 4] = -1 + + sol_first = np.select([sol_min_row_index==0, sol_min_row_index==1], value, np.nan) + sol_last = np.select([sol_max_row_index==0, sol_max_row_index==1], value, np.nan) + sol_where_max_other = np.select([sol_max==-33, sol_max==-55], other, np.nan) + sol_where_max_row = np.select([sol_max==-33, sol_max==-55], [0, 1], -1) + sol_where_min_other = np.select([sol_min==-33, sol_min==-55], other, np.nan) + sol_where_min_row = np.select([sol_min==-33, sol_min==-55], [0, 1], -1) + + # count + agg = canvas.line(source, x="x", y="name", agg=ds.count()) + assert_eq_ndarray(agg.x_range, (0, 4), close=True) + assert_eq_ndarray(agg.y_range, (0, 2), close=True) + assert_eq_ndarray(agg.data, sol_count) + assert isinstance(agg.data, cupy.ndarray if cuda else np.ndarray) + + # any + agg = canvas.line(source, x="x", y="name", agg=ds.any()) + assert_eq_ndarray(agg.data, sol_count > 0) + + # max + agg = canvas.line(source, x="x", y="name", agg=ds.max("value")) + assert_eq_ndarray(agg.data, sol_max) + + # min + agg = canvas.line(source, x="x", y="name", agg=ds.min("value")) + assert_eq_ndarray(agg.data, sol_min) + + # sum + agg = canvas.line(source, x="x", y="name", agg=ds.sum("value")) + assert_eq_ndarray(agg.data, sol_sum) + + # _max_row_index + agg = canvas.line(source, x="x", y="name", agg=ds._max_row_index()) + assert_eq_ndarray(agg.data, sol_max_row_index) + + # _min_row_index + agg = canvas.line(source, x="x", y="name", agg=ds._min_row_index()) + assert_eq_ndarray(agg.data, sol_min_row_index) + + # first + agg = canvas.line(source, x="x", y="name", agg=ds.first("value")) + assert_eq_ndarray(agg.data, sol_first) + + # last + agg = canvas.line(source, x="x", y="name", agg=ds.last("value")) + assert_eq_ndarray(agg.data, sol_last) + + # where(max) returning other row + agg = canvas.line(source, x="x", y="name", agg=ds.where(ds.max("value"), "other")) + assert_eq_ndarray(agg.data, sol_where_max_other) + + # where(max) returning row index + agg = canvas.line(source, x="x", y="name", agg=ds.where(ds.max("value"))) + assert_eq_ndarray(agg.data, sol_where_max_row) + + # where(min) returning other row + agg = canvas.line(source, x="x", y="name", agg=ds.where(ds.min("value"), "other")) + assert_eq_ndarray(agg.data, sol_where_min_other) + + # where(min) returning row index + agg = canvas.line(source, x="x", y="name", agg=ds.where(ds.min("value"))) + assert_eq_ndarray(agg.data, sol_where_min_row)