Skip to content

Commit

Permalink
Support reductions using other columns and row index
Browse files Browse the repository at this point in the history
  • Loading branch information
ianthomas23 committed Sep 27, 2023
1 parent d1b3d34 commit 144f7dc
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 23 deletions.
19 changes: 16 additions & 3 deletions datashader/data_libraries/dask_xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,27 +368,35 @@ def dask_xarray_lines(

x_name = glyph.x
x_dim_index = glyph.x_dim_index
other_dim_name = xr_ds[glyph.y].coords.dims[1 - x_dim_index]
other_dim_index = 1 - x_dim_index
other_dim_name = xr_ds[glyph.y].coords.dims[other_dim_index]
xs = xr_ds[x_name]

# Build chunk offsets for coordinates
chunk_offsets = {}
for k, chunks in xr_ds.chunks.items():
chunk_offsets[k] = [0] + list(np.cumsum(chunks))

partitioned = True
uses_row_index = summary.uses_row_index(cuda, partitioned)

def chunk(np_array, *chunk_indices):
aggs = create(shape)

start_x_index = chunk_offsets[x_name][chunk_indices[x_dim_index]]
end_x_index = start_x_index + np_array.shape[x_dim_index]
x = xs[start_x_index:end_x_index].values

start_other_index = chunk_offsets[other_dim_name][chunk_indices[1 - x_dim_index]]
end_other_index = start_other_index + np_array.shape[1 - x_dim_index]
start_other_index = chunk_offsets[other_dim_name][chunk_indices[other_dim_index]]
end_other_index = start_other_index + np_array.shape[other_dim_index]

data_vars = dict(
name=(("x", other_dim_name) if x_dim_index == 0 else (other_dim_name, "x"), np_array),
)
# Other required columns are chunked in the other_dim
for column_name in column_names:
values = xr_ds[column_name][start_other_index:end_other_index].values
data_vars[column_name] = (other_dim_name, values)

chunk_ds = xr.Dataset(
data_vars=data_vars,
Expand All @@ -398,6 +406,11 @@ def chunk(np_array, *chunk_indices):
),
)

if uses_row_index:
row_offset = start_other_index
chunk_ds.attrs["_datashader_row_offset"] = row_offset
chunk_ds.attrs["_datashader_row_length"] = end_other_index - start_other_index

extend(aggs, chunk_ds, st, bounds)
return aggs

Expand Down
10 changes: 10 additions & 0 deletions datashader/data_libraries/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datashader.compiler import compile_components
from datashader.glyphs.points import _PointLike, _GeometryLike
from datashader.glyphs.area import _AreaToLineLike
from datashader.glyphs.line import LinesXarrayCommonX
from datashader.utils import Dispatcher

__all__ = ()
Expand Down Expand Up @@ -45,6 +46,15 @@ def default(glyph, source, schema, canvas, summary, *, antialias=False, cuda=Fal

bases = create((height, width))

if isinstance(glyph, LinesXarrayCommonX) and summary.uses_row_index(cuda, partitioned=False):
# Need to use a row index and extract.apply() doesn't have enough
# information to determine the coordinate length itself so do so here
# and pass it along as an xarray attribute in the usual manner.
other_dim_index = 1 - glyph.x_dim_index
other_dim_name = source[glyph.y].coords.dims[other_dim_index]
length = len(source[other_dim_name])
source = source.assign_attrs(_datashader_row_offset=0, _datashader_row_length=length)

extend(bases, source, x_st + y_st, x_range + y_range)

return finalize(bases,
Expand Down
1 change: 0 additions & 1 deletion datashader/glyphs/line.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,6 @@ def perform_extend_line(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols
):
x0 = xs[j]
y0 = ys[i, j]
x1 = xs[j + 1]
if swap_dims:
y0 = ys[j, i]
Expand Down
33 changes: 21 additions & 12 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,18 @@ class extract(Preprocess):
"""Extract a column from a dataframe as a numpy array of values."""
def apply(self, df):
if self.column is SpecialColumn.RowIndex:
attrs = getattr(df, "attrs", None)
row_offset = getattr(attrs or df, "_datashader_row_offset", 0)
attr_name = "_datashader_row_offset"
if isinstance(df, xr.Dataset):
row_offset = df.attrs[attr_name]
row_length = df.attrs["_datashader_row_length"]
else:
attrs = getattr(df, "attrs", None)
row_offset = getattr(attrs or df, attr_name, 0)
row_length = len(df)

if cudf and isinstance(df, cudf.DataFrame):
if self.column is SpecialColumn.RowIndex:
return cp.arange(row_offset, row_offset+len(df), dtype=np.int64)
return cp.arange(row_offset, row_offset + row_length, dtype=np.int64)

Check warning on line 100 in datashader/reductions.py

View check run for this annotation

Codecov / codecov/patch

datashader/reductions.py#L100

Added line #L100 was not covered by tests

if df[self.column].dtype.kind == 'f':
nullval = np.nan
Expand All @@ -100,11 +106,11 @@ def apply(self, df):
if Version(cudf.__version__) >= Version("22.02"):
return df[self.column].to_cupy(na_value=nullval)
return cp.array(df[self.column].to_gpu_array(fillna=nullval))
elif self.column is SpecialColumn.RowIndex:
return np.arange(row_offset, row_offset + row_length, dtype=np.int64)
elif isinstance(df, xr.Dataset):
# DataArray could be backed by numpy or cupy array
return df[self.column].data
elif self.column is SpecialColumn.RowIndex:
return np.arange(row_offset, row_offset+len(df), dtype=np.int64)
else:
return df[self.column].values

Expand Down Expand Up @@ -2016,9 +2022,11 @@ def wrapped_combine(aggs, selector_aggs):
elif cuda:
is_n_reduction = isinstance(self.selector, FloatingNReduction)
shape = aggs[0].shape[:-1] if is_n_reduction else aggs[0].shape
combine[cuda_args(shape)](aggs, selector_aggs)
for i in range(1, len(aggs)):
combine[cuda_args(shape)]((aggs[0], aggs[i]), (selector_aggs[0], selector_aggs[i]))

Check warning on line 2026 in datashader/reductions.py

View check run for this annotation

Codecov / codecov/patch

datashader/reductions.py#L2025-L2026

Added lines #L2025 - L2026 were not covered by tests
else:
combine(aggs, selector_aggs)
for i in range(1, len(aggs)):
combine((aggs[0], aggs[i]), (selector_aggs[0], selector_aggs[i]))

return aggs[0], selector_aggs[0]

Expand Down Expand Up @@ -2173,10 +2181,11 @@ def _append_cuda(x, y, agg, field):
def _combine(aggs):
# Maximum ignoring -1 values
# Works for CPU and GPU
if len(aggs) > 1:
ret = aggs[0]
for i in range(1, len(aggs)):
# Works with numpy or cupy arrays
np.maximum(aggs[0], aggs[1], out=aggs[0])
return aggs[0]
np.maximum(ret, aggs[i], out=ret)
return ret


class _min_row_index(_max_or_min_row_index):
Expand Down Expand Up @@ -2236,9 +2245,9 @@ def _build_combine(self, dshape, antialias, cuda, partitioned, categorical = Fal
def _combine(aggs):
# Minimum ignoring -1 values
ret = aggs[0]
if len(aggs) > 1:
for i in range(1, len(aggs)):
# Can take 2d (ny, nx) or 3d (ny, nx, ncat) arrays.
row_min_in_place(aggs[0], aggs[1])
row_min_in_place(ret, aggs[i])
return ret

@staticmethod
Expand Down
71 changes: 64 additions & 7 deletions datashader/tests/test_xarray.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations
import numpy as np
from numpy import nan
import xarray as xr

import datashader as ds
Expand Down Expand Up @@ -85,7 +86,11 @@ def test_count(source):

@pytest.mark.parametrize("ds2d", ds2ds)
@pytest.mark.parametrize("chunksizes", [
None, dict(x=10, channel=10), dict(x=10, channel=1), dict(x=3, channel=10),
None,
dict(x=10, channel=10),
dict(x=10, channel=1),
dict(x=3, channel=10),
dict(x=3, channel=1),
])
def test_lines_xarray_common_x(ds2d, chunksizes):
if chunksizes is None:
Expand All @@ -95,15 +100,39 @@ def test_lines_xarray_common_x(ds2d, chunksizes):

canvas = ds.Canvas(plot_height=3, plot_width=7)

# count
sol_count = np.array([
[0, 0, 1, 1, 0, 0, 0],
[1, 2, 1, 1, 2, 2, 1],
[1, 0, 0, 0, 0, 0, 1],
], dtype=np.uint32)
# Expected solutions
sol_count = np.array(
[[0, 0, 1, 1, 0, 0, 0], [1, 2, 1, 1, 2, 2, 1], [1, 0, 0, 0, 0, 0, 1]],
dtype=np.uint32)
sol_max = np.array(
[[nan, nan, -33, -33, nan, nan, nan], [-55, -33, -55, -55, -33, -33, -55], [-33, nan, nan, nan, nan, nan, -33]],
dtype=np.float64)
sol_min = np.array(
[[nan, nan, -33, -33, nan, nan, nan], [-55, -55, -55, -55, -55, -55, -55], [-33, nan, nan, nan, nan, nan, -33]],
dtype=np.float64)
sol_sum = np.array(
[[nan, nan, -33, -33, nan, nan, nan], [-55, -88, -55, -55, -88, -88, -55], [-33, nan, nan, nan, nan, nan, -33]],
dtype=np.float64)
sol_max_row_index = np.array(
[[-1, -1, 0, 0, -1, -1, -1], [1, 1, 1, 1, 1, 1, 1], [0, -1, -1, -1, -1, -1, 0]],
dtype=np.int64)
sol_min_row_index = np.array(
[[-1, -1, 0, 0, -1, -1, -1], [1, 0, 1, 1, 0, 0, 1], [0, -1, -1, -1, -1, -1, 0]],
dtype=np.int64)

if chunksizes is not None and chunksizes["x"] == 3:
# Dask chunking in x-direction gives different (incorrect) results.
sol_count[:, 4] = 0
sol_max[:, 4] = nan
sol_min[:, 4] = nan
sol_sum[:, 4] = nan
sol_max_row_index[:, 4] = -1
sol_min_row_index[:, 4] = -1

sol_first = np.select([sol_min_row_index==0, sol_min_row_index==1], value, np.nan)
sol_last = np.select([sol_max_row_index==0, sol_max_row_index==1], value, np.nan)

# count
agg = canvas.line(source, x="x", y="name", agg=ds.count())
assert_eq_ndarray(agg.x_range, (0, 4), close=True)
assert_eq_ndarray(agg.y_range, (0, 2), close=True)
Expand All @@ -112,3 +141,31 @@ def test_lines_xarray_common_x(ds2d, chunksizes):
# any
agg = canvas.line(source, x="x", y="name", agg=ds.any())
assert_eq_ndarray(agg.data, sol_count > 0)

# max
agg = canvas.line(source, x="x", y="name", agg=ds.max("value"))
assert_eq_ndarray(agg.data, sol_max)

# min
agg = canvas.line(source, x="x", y="name", agg=ds.min("value"))
assert_eq_ndarray(agg.data, sol_min)

# sum
agg = canvas.line(source, x="x", y="name", agg=ds.sum("value"))
assert_eq_ndarray(agg.data, sol_sum)

# _max_row_index
agg = canvas.line(source, x="x", y="name", agg=ds._max_row_index())
assert_eq_ndarray(agg.data, sol_max_row_index)

# _min_row_index
agg = canvas.line(source, x="x", y="name", agg=ds._min_row_index())
assert_eq_ndarray(agg.data, sol_min_row_index)

# first
agg = canvas.line(source, x="x", y="name", agg=ds.first("value"))
assert_eq_ndarray(agg.data, sol_first)

# last
agg = canvas.line(source, x="x", y="name", agg=ds.last("value"))
assert_eq_ndarray(agg.data, sol_last)

0 comments on commit 144f7dc

Please sign in to comment.