Skip to content

Commit

Permalink
Implement lines using 2D xarray with common x coordinates (#1282)
Browse files Browse the repository at this point in the history
* Implement lines using 2D xarray with common x coordinates

* Don't convert to cupy array if already are

* Cleanup object creation
  • Loading branch information
ianthomas23 authored Oct 25, 2023
1 parent d338d98 commit 0af4cd6
Show file tree
Hide file tree
Showing 10 changed files with 404 additions and 43 deletions.
6 changes: 3 additions & 3 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti

create = make_create(bases, dshapes, cuda)
append, any_uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias)
info = make_info(cols, any_uses_cuda_mutex)
info = make_info(cols, cuda, any_uses_cuda_mutex)
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned)
finalize = make_finalize(bases, agg, schema, cuda, partitioned)

Expand Down Expand Up @@ -302,9 +302,9 @@ def make_create(bases, dshapes, cuda):
return lambda shape: tuple(c(shape, array_module) for c in creators)


def make_info(cols, uses_cuda_mutex: bool):
def make_info(cols, cuda, uses_cuda_mutex: bool):
def info(df, canvas_shape):
ret = tuple(c.apply(df) for c in cols)
ret = tuple(c.apply(df, cuda) for c in cols)
if uses_cuda_mutex:
import cupy # Guaranteed to be available if uses_cuda_mutex is True
import numba
Expand Down
16 changes: 15 additions & 1 deletion datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None,
"""
from .glyphs import (LineAxis0, LinesAxis1, LinesAxis1XConstant,
LinesAxis1YConstant, LineAxis0Multi,
LinesAxis1Ragged, LineAxis1Geometry)
LinesAxis1Ragged, LineAxis1Geometry, LinesXarrayCommonX)

validate_xy_or_geometry('Line', x, y, geometry)

Expand Down Expand Up @@ -383,6 +383,20 @@ def line(self, source, x=None, y=None, agg=None, axis=0, geometry=None,
"dask_geopandas.GeoDataFrame. Received objects of type {typ}".format(
typ=type(source)))

elif isinstance(source, Dataset) and isinstance(x, str) and isinstance(y, str):
x_arr = source[x]
y_arr = source[y]
if x_arr.ndim != 1:
raise ValueError(f"x array must have 1 dimension not {x_arr.ndim}")

if y_arr.ndim != 2:
raise ValueError(f"y array must have 2 dimensions not {y_arr.ndim}")
if x not in y_arr.dims:
raise ValueError("x must be one of the coordinate dimensions of y")

y_coord_dims = list(y_arr.coords.dims)
x_dim_index = y_coord_dims.index(x)
glyph = LinesXarrayCommonX(x, y, x_dim_index)
else:
# Broadcast column specifications to handle cases where
# x is a list and y is a string or vice versa
Expand Down
77 changes: 77 additions & 0 deletions datashader/data_libraries/dask_xarray.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datashader.compiler import compile_components
from datashader.utils import Dispatcher
from datashader.glyphs.line import LinesXarrayCommonX
from datashader.glyphs.quadmesh import (
QuadMeshRaster, QuadMeshRectilinear, QuadMeshCurvilinear, build_scale_translate
)
Expand Down Expand Up @@ -349,6 +350,82 @@ def chunk(np_zs, np_x_centers, np_y_centers):
return dsk, result_name


def dask_xarray_lines(
glyph: LinesXarrayCommonX, xr_ds: xr.Dataset, schema, canvas, summary,
*, antialias=False, cuda=False,
):
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2, antialias_stage_2_funcs, \
column_names = compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda,
partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(
x_mapper, y_mapper, info, append, antialias_stage_2, antialias_stage_2_funcs)
x_range = bounds[:2]
y_range = bounds[2:]

x_name = glyph.x
x_dim_index = glyph.x_dim_index
other_dim_index = 1 - x_dim_index
other_dim_name = xr_ds[glyph.y].coords.dims[other_dim_index]
xs = xr_ds[x_name]

# Build chunk offsets for coordinates
chunk_offsets = {}
for k, chunks in xr_ds.chunks.items():
chunk_offsets[k] = [0] + list(np.cumsum(chunks))

partitioned = True
uses_row_index = summary.uses_row_index(cuda, partitioned)

def chunk(np_array, *chunk_indices):
aggs = create(shape)

start_x_index = chunk_offsets[x_name][chunk_indices[x_dim_index]]
end_x_index = start_x_index + np_array.shape[x_dim_index]
x = xs[start_x_index:end_x_index].values

start_other_index = chunk_offsets[other_dim_name][chunk_indices[other_dim_index]]
end_other_index = start_other_index + np_array.shape[other_dim_index]

data_vars = dict(
name=(("x", other_dim_name) if x_dim_index == 0 else (other_dim_name, "x"), np_array),
)
# Other required columns are chunked in the other_dim
for column_name in column_names:
values = xr_ds[column_name][start_other_index:end_other_index].values
data_vars[column_name] = (other_dim_name, values)

chunk_ds = xr.Dataset(
data_vars=data_vars,
coords=dict(
x=("x", x),
other_dim_name=(other_dim_name, np.arange(start_other_index, end_other_index)),
),
)

if uses_row_index:
row_offset = start_other_index
chunk_ds.attrs["_datashader_row_offset"] = row_offset
chunk_ds.attrs["_datashader_row_length"] = end_other_index - start_other_index

extend(aggs, chunk_ds, st, bounds)
return aggs

name = tokenize(xr_ds.__dask_tokenize__(), canvas, glyph, summary)
keys = [k for row in xr_ds.__dask_keys__()[0] for k in row]
keys2 = [(name, i) for i in range(len(keys))]
dsk = dict((k2, (chunk, k, k[1], k[2])) for (k2, k) in zip(keys2, keys))
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label],
attrs=dict(x_range=x_range, y_range=y_range)))
return dsk, name


dask_glyph_dispatch.register(QuadMeshRectilinear)(dask_rectilinear)
dask_glyph_dispatch.register(QuadMeshRaster)(dask_raster)
dask_glyph_dispatch.register(QuadMeshCurvilinear)(dask_curvilinear)
dask_glyph_dispatch.register(LinesXarrayCommonX)(dask_xarray_lines)
10 changes: 10 additions & 0 deletions datashader/data_libraries/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datashader.compiler import compile_components
from datashader.glyphs.points import _PointLike, _GeometryLike
from datashader.glyphs.area import _AreaToLineLike
from datashader.glyphs.line import LinesXarrayCommonX
from datashader.utils import Dispatcher

__all__ = ()
Expand Down Expand Up @@ -46,6 +47,15 @@ def default(glyph, source, schema, canvas, summary, *, antialias=False, cuda=Fal

bases = create((height, width))

if isinstance(glyph, LinesXarrayCommonX) and summary.uses_row_index(cuda, partitioned=False):
# Need to use a row index and extract.apply() doesn't have enough
# information to determine the coordinate length itself so do so here
# and pass it along as an xarray attribute in the usual manner.
other_dim_index = 1 - glyph.x_dim_index
other_dim_name = source[glyph.y].coords.dims[other_dim_index]
length = len(source[other_dim_name])
source = source.assign_attrs(_datashader_row_offset=0, _datashader_row_length=length)

extend(bases, source, x_st + y_st, x_range + y_range)

return finalize(bases,
Expand Down
10 changes: 9 additions & 1 deletion datashader/data_libraries/xarray.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
from datashader.glyphs.line import LinesXarrayCommonX
from datashader.glyphs.quadmesh import _QuadMeshLike
from datashader.data_libraries.pandas import default
from datashader.core import bypixel
Expand All @@ -16,7 +17,13 @@

@bypixel.pipeline.register(xr.Dataset)
def xarray_pipeline(xr_ds, schema, canvas, glyph, summary, *, antialias=False):
cuda = cupy and isinstance(xr_ds[glyph.name].data, cupy.ndarray)
cuda = False
if cupy:
if isinstance(glyph, LinesXarrayCommonX):
cuda = isinstance(xr_ds[glyph.y].data, cupy.ndarray)
else:
cuda = isinstance(xr_ds[glyph.name].data, cupy.ndarray)

if not xr_ds.chunks:
return glyph_dispatch(
glyph, xr_ds, schema, canvas, summary, antialias=antialias, cuda=cuda)
Expand All @@ -28,3 +35,4 @@ def xarray_pipeline(xr_ds, schema, canvas, glyph, summary, *, antialias=False):

# Default to default pandas implementation
glyph_dispatch.register(_QuadMeshLike)(default)
glyph_dispatch.register(LinesXarrayCommonX)(default)
1 change: 1 addition & 0 deletions datashader/glyphs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
LinesAxis1Ragged,
LineAxis1Geometry,
LineAxis1GeoPandas,
LinesXarrayCommonX,
)
from .area import ( # noqa (API import)
AreaToZeroAxis0,
Expand Down
6 changes: 6 additions & 0 deletions datashader/glyphs/glyph.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
import pandas as pd
import xarray as xr

from datashader.utils import Expr, ngjit
from datashader.macros import expand_varargs
Expand Down Expand Up @@ -54,6 +55,11 @@ def _compute_bounds(s):
return (s.min(), s.max())
elif isinstance(s, pd.Series):
return Glyph._compute_bounds_numba(s.values)
elif isinstance(s, xr.DataArray):
if cp and isinstance(s.data, cp.ndarray):
return (s.min().item(), s.max().item())
else:
return Glyph._compute_bounds_numba(s.values.ravel())
else:
return Glyph._compute_bounds_numba(s)

Expand Down
106 changes: 90 additions & 16 deletions datashader/glyphs/line.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ..transfer_functions._cuda_utils import cuda_args
except ImportError:
cudf = None
cp = None
cuda_args = None

try:
Expand Down Expand Up @@ -607,7 +608,78 @@ def extend(aggs, df, vt, bounds, plot_start=True):

perform_extend_cpu(
sx, tx, sy, ty, xmin, xmax, ymin, ymax,
geom_array, antialias_stage_2, *aggs_and_cols
geom_array, antialias_stage_2, *aggs_and_cols,
)

return extend


class LinesXarrayCommonX(LinesAxis1):
def __init__(self, x, y, x_dim_index: int):
super().__init__(x, y)
self.x_dim_index = x_dim_index

def __hash__(self):
# This ensures that @memoize below caches different functions for different x_dim_index.
return hash((type(self), self.x_dim_index))

def compute_x_bounds(self, dataset):
bounds = self._compute_bounds(dataset[self.x])
return self.maybe_expand_bounds(bounds)

def compute_y_bounds(self, dataset):
bounds = self._compute_bounds(dataset[self.y])
return self.maybe_expand_bounds(bounds)

def compute_bounds_dask(self, xr_ds):
return self.compute_x_bounds(xr_ds), self.compute_y_bounds(xr_ds)

def validate(self, in_dshape):
if not isreal(in_dshape.measure[str(self.x)]):
raise ValueError('x column must be real')

if not isreal(in_dshape.measure[str(self.y)]):
raise ValueError('y column must be real')

@memoize
def _internal_build_extend(
self, x_mapper, y_mapper, info, append, line_width, antialias_stage_2,
antialias_stage_2_funcs,
):
expand_aggs_and_cols = self.expand_aggs_and_cols(append)
draw_segment, antialias_stage_2_funcs = _line_internal_build_extend(
x_mapper, y_mapper, append, line_width, antialias_stage_2, antialias_stage_2_funcs,
expand_aggs_and_cols,
)
swap_dims = self.x_dim_index == 0
extend_cpu, extend_cuda = _build_extend_line_axis1_x_constant(
draw_segment, expand_aggs_and_cols, antialias_stage_2_funcs, swap_dims,
)

x_name = self.x
y_name = self.y

def extend(aggs, df, vt, bounds, plot_start=True):
sx, tx, sy, ty = vt
xmin, xmax, ymin, ymax = bounds
aggs_and_cols = aggs + info(df, aggs[0].shape[:2])

if cudf and isinstance(df, cudf.DataFrame):
xs = cp.asarray(df[x_name])
ys = cp.asarray(df[y_name])
do_extend = extend_cuda[cuda_args(ys.shape)]
elif cp and isinstance(df[y_name].data, cp.ndarray):
xs = cp.asarray(df[x_name])
ys = df[y_name].data
shape = ys.shape[::-1] if swap_dims else ys.shape
do_extend = extend_cuda[cuda_args(shape)]
else:
xs = df[x_name].to_numpy()
ys = df[y_name].to_numpy()
do_extend = extend_cpu

do_extend(
sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2, *aggs_and_cols
)

return extend
Expand Down Expand Up @@ -1263,7 +1335,7 @@ def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_


def _build_extend_line_axis1_x_constant(draw_segment, expand_aggs_and_cols,
antialias_stage_2_funcs):
antialias_stage_2_funcs, swap_dims: bool = False):
if antialias_stage_2_funcs is not None:
aa_stage_2_accumulate, aa_stage_2_clear, aa_stage_2_copy_back = antialias_stage_2_funcs
use_2_stage_agg = antialias_stage_2_funcs is not None
Expand All @@ -1274,22 +1346,24 @@ def perform_extend_line(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols
):
x0 = xs[j]
y0 = ys[i, j]
x1 = xs[j + 1]
y1 = ys[i, j + 1]

segment_start = (
(j == 0) or isnull(xs[j - 1]) or isnull(ys[i, j - 1])
)

segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[i, j+2])
if swap_dims:
y0 = ys[j, i]
y1 = ys[j + 1, i]
segment_start = (j == 0) or isnull(xs[j - 1]) or isnull(ys[j - 1, i])
segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[j+2, i])
else:
y0 = ys[i, j]
y1 = ys[i, j + 1]
segment_start = (j == 0) or isnull(xs[j - 1]) or isnull(ys[i, j - 1])
segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[i, j+2])

if segment_start or use_2_stage_agg:
xm = 0.0
ym = 0.0
else:
xm = xs[j-1]
ym = ys[i, j-1]
ym = ys[j-1, i] if swap_dims else ys[i, j-1]

draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax,
segment_start, segment_end, x0, x1, y0, y1,
Expand All @@ -1301,8 +1375,8 @@ def extend_cpu(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2
*aggs_and_cols):
antialias = antialias_stage_2 is not None
buffer = np.empty(8) if antialias else None
ncols = ys.shape[1]
for i in range(ys.shape[0]):
ncols, nrows = ys.shape if swap_dims else ys.shape[::-1]
for i in range(nrows):
for j in range(ncols - 1):
perform_extend_line(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols
Expand Down Expand Up @@ -1347,10 +1421,10 @@ def extend_cuda(sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_
antialias = antialias_stage_2 is not None
buffer = cuda.local.array(8, nb_types.float64) if antialias else None
i, j = cuda.grid(2)
if i < ys.shape[0] and j < ys.shape[1] - 1:
ncols, nrows = ys.shape if swap_dims else ys.shape[::-1]
if i < nrows and j < ncols - 1:
perform_extend_line(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys,
buffer, *aggs_and_cols
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols
)

if use_2_stage_agg:
Expand Down
Loading

0 comments on commit 0af4cd6

Please sign in to comment.