Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement lines using 2D xarray with common x coordinates #1282

Merged
merged 3 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions datashader/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def compile_components(agg, schema, glyph, *, antialias=False, cuda=False, parti

create = make_create(bases, dshapes, cuda)
append, any_uses_cuda_mutex = make_append(bases, cols, calls, glyph, antialias)
info = make_info(cols, any_uses_cuda_mutex)
info = make_info(cols, cuda, any_uses_cuda_mutex)
combine = make_combine(bases, dshapes, temps, combine_temps, antialias, cuda, partitioned)
finalize = make_finalize(bases, agg, schema, cuda, partitioned)

Expand Down Expand Up @@ -302,9 +302,9 @@ def make_create(bases, dshapes, cuda):
return lambda shape: tuple(c(shape, array_module) for c in creators)


def make_info(cols, uses_cuda_mutex: bool):
def make_info(cols, cuda, uses_cuda_mutex: bool):
def info(df, canvas_shape):
ret = tuple(c.apply(df) for c in cols)
ret = tuple(c.apply(df, cuda) for c in cols)
if uses_cuda_mutex:
import cupy # Guaranteed to be available if uses_cuda_mutex is True
import numba
Expand Down
16 changes: 15 additions & 1 deletion datashader/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@
"""
from .glyphs import (LineAxis0, LinesAxis1, LinesAxis1XConstant,
LinesAxis1YConstant, LineAxis0Multi,
LinesAxis1Ragged, LineAxis1Geometry)
LinesAxis1Ragged, LineAxis1Geometry, LinesXarrayCommonX)

validate_xy_or_geometry('Line', x, y, geometry)

Expand Down Expand Up @@ -383,6 +383,20 @@
"dask_geopandas.GeoDataFrame. Received objects of type {typ}".format(
typ=type(source)))

elif isinstance(source, Dataset) and isinstance(x, str) and isinstance(y, str):
x_arr = source[x]
y_arr = source[y]
if x_arr.ndim != 1:
raise ValueError(f"x array must have 1 dimension not {x_arr.ndim}")

Check warning on line 390 in datashader/core.py

View check run for this annotation

Codecov / codecov/patch

datashader/core.py#L390

Added line #L390 was not covered by tests

if y_arr.ndim != 2:
raise ValueError(f"y array must have 2 dimensions not {y_arr.ndim}")

Check warning on line 393 in datashader/core.py

View check run for this annotation

Codecov / codecov/patch

datashader/core.py#L393

Added line #L393 was not covered by tests
if x not in y_arr.dims:
raise ValueError("x must be one of the coordinate dimensions of y")

Check warning on line 395 in datashader/core.py

View check run for this annotation

Codecov / codecov/patch

datashader/core.py#L395

Added line #L395 was not covered by tests

y_coord_dims = list(y_arr.coords.dims)
x_dim_index = y_coord_dims.index(x)
glyph = LinesXarrayCommonX(x, y, x_dim_index)
else:
# Broadcast column specifications to handle cases where
# x is a list and y is a string or vice versa
Expand Down
77 changes: 77 additions & 0 deletions datashader/data_libraries/dask_xarray.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datashader.compiler import compile_components
from datashader.utils import Dispatcher
from datashader.glyphs.line import LinesXarrayCommonX
from datashader.glyphs.quadmesh import (
QuadMeshRaster, QuadMeshRectilinear, QuadMeshCurvilinear, build_scale_translate
)
Expand Down Expand Up @@ -349,6 +350,82 @@ def chunk(np_zs, np_x_centers, np_y_centers):
return dsk, result_name


def dask_xarray_lines(
glyph: LinesXarrayCommonX, xr_ds: xr.Dataset, schema, canvas, summary,
*, antialias=False, cuda=False,
):
shape, bounds, st, axis = shape_bounds_st_and_axis(xr_ds, canvas, glyph)

# Compile functions
create, info, append, combine, finalize, antialias_stage_2, antialias_stage_2_funcs, \
column_names = compile_components(summary, schema, glyph, antialias=antialias, cuda=cuda,
partitioned=True)
x_mapper = canvas.x_axis.mapper
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(
x_mapper, y_mapper, info, append, antialias_stage_2, antialias_stage_2_funcs)
x_range = bounds[:2]
y_range = bounds[2:]

x_name = glyph.x
x_dim_index = glyph.x_dim_index
other_dim_index = 1 - x_dim_index
other_dim_name = xr_ds[glyph.y].coords.dims[other_dim_index]
xs = xr_ds[x_name]

# Build chunk offsets for coordinates
chunk_offsets = {}
for k, chunks in xr_ds.chunks.items():
chunk_offsets[k] = [0] + list(np.cumsum(chunks))

partitioned = True
uses_row_index = summary.uses_row_index(cuda, partitioned)

def chunk(np_array, *chunk_indices):
aggs = create(shape)

start_x_index = chunk_offsets[x_name][chunk_indices[x_dim_index]]
end_x_index = start_x_index + np_array.shape[x_dim_index]
x = xs[start_x_index:end_x_index].values

start_other_index = chunk_offsets[other_dim_name][chunk_indices[other_dim_index]]
end_other_index = start_other_index + np_array.shape[other_dim_index]

data_vars = dict(
name=(("x", other_dim_name) if x_dim_index == 0 else (other_dim_name, "x"), np_array),
)
# Other required columns are chunked in the other_dim
for column_name in column_names:
values = xr_ds[column_name][start_other_index:end_other_index].values
data_vars[column_name] = (other_dim_name, values)

chunk_ds = xr.Dataset(
data_vars=data_vars,
coords=dict(
x=("x", x),
other_dim_name=(other_dim_name, np.arange(start_other_index, end_other_index)),
),
)

if uses_row_index:
row_offset = start_other_index
chunk_ds.attrs["_datashader_row_offset"] = row_offset
chunk_ds.attrs["_datashader_row_length"] = end_other_index - start_other_index

extend(aggs, chunk_ds, st, bounds)
return aggs

name = tokenize(xr_ds.__dask_tokenize__(), canvas, glyph, summary)
keys = [k for row in xr_ds.__dask_keys__()[0] for k in row]
keys2 = [(name, i) for i in range(len(keys))]
dsk = dict((k2, (chunk, k, k[1], k[2])) for (k2, k) in zip(keys2, keys))
dsk[name] = (apply, finalize, [(combine, keys2)],
dict(cuda=cuda, coords=axis, dims=[glyph.y_label, glyph.x_label],
attrs=dict(x_range=x_range, y_range=y_range)))
return dsk, name


dask_glyph_dispatch.register(QuadMeshRectilinear)(dask_rectilinear)
dask_glyph_dispatch.register(QuadMeshRaster)(dask_raster)
dask_glyph_dispatch.register(QuadMeshCurvilinear)(dask_curvilinear)
dask_glyph_dispatch.register(LinesXarrayCommonX)(dask_xarray_lines)
10 changes: 10 additions & 0 deletions datashader/data_libraries/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from datashader.compiler import compile_components
from datashader.glyphs.points import _PointLike, _GeometryLike
from datashader.glyphs.area import _AreaToLineLike
from datashader.glyphs.line import LinesXarrayCommonX
from datashader.utils import Dispatcher

__all__ = ()
Expand Down Expand Up @@ -46,6 +47,15 @@ def default(glyph, source, schema, canvas, summary, *, antialias=False, cuda=Fal

bases = create((height, width))

if isinstance(glyph, LinesXarrayCommonX) and summary.uses_row_index(cuda, partitioned=False):
# Need to use a row index and extract.apply() doesn't have enough
# information to determine the coordinate length itself so do so here
# and pass it along as an xarray attribute in the usual manner.
other_dim_index = 1 - glyph.x_dim_index
other_dim_name = source[glyph.y].coords.dims[other_dim_index]
length = len(source[other_dim_name])
source = source.assign_attrs(_datashader_row_offset=0, _datashader_row_length=length)

extend(bases, source, x_st + y_st, x_range + y_range)

return finalize(bases,
Expand Down
10 changes: 9 additions & 1 deletion datashader/data_libraries/xarray.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from __future__ import annotations
from datashader.glyphs.line import LinesXarrayCommonX
from datashader.glyphs.quadmesh import _QuadMeshLike
from datashader.data_libraries.pandas import default
from datashader.core import bypixel
Expand All @@ -16,7 +17,13 @@

@bypixel.pipeline.register(xr.Dataset)
def xarray_pipeline(xr_ds, schema, canvas, glyph, summary, *, antialias=False):
cuda = cupy and isinstance(xr_ds[glyph.name].data, cupy.ndarray)
cuda = False
if cupy:
if isinstance(glyph, LinesXarrayCommonX):
cuda = isinstance(xr_ds[glyph.y].data, cupy.ndarray)

Check warning on line 23 in datashader/data_libraries/xarray.py

View check run for this annotation

Codecov / codecov/patch

datashader/data_libraries/xarray.py#L22-L23

Added lines #L22 - L23 were not covered by tests
else:
cuda = isinstance(xr_ds[glyph.name].data, cupy.ndarray)

Check warning on line 25 in datashader/data_libraries/xarray.py

View check run for this annotation

Codecov / codecov/patch

datashader/data_libraries/xarray.py#L25

Added line #L25 was not covered by tests

if not xr_ds.chunks:
return glyph_dispatch(
glyph, xr_ds, schema, canvas, summary, antialias=antialias, cuda=cuda)
Expand All @@ -28,3 +35,4 @@

# Default to default pandas implementation
glyph_dispatch.register(_QuadMeshLike)(default)
glyph_dispatch.register(LinesXarrayCommonX)(default)
1 change: 1 addition & 0 deletions datashader/glyphs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
LinesAxis1Ragged,
LineAxis1Geometry,
LineAxis1GeoPandas,
LinesXarrayCommonX,
)
from .area import ( # noqa (API import)
AreaToZeroAxis0,
Expand Down
6 changes: 6 additions & 0 deletions datashader/glyphs/glyph.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import numpy as np
import pandas as pd
import xarray as xr

from datashader.utils import Expr, ngjit
from datashader.macros import expand_varargs
Expand Down Expand Up @@ -54,6 +55,11 @@
return (s.min(), s.max())
elif isinstance(s, pd.Series):
return Glyph._compute_bounds_numba(s.values)
elif isinstance(s, xr.DataArray):
if cp and isinstance(s.data, cp.ndarray):
return (s.min().item(), s.max().item())

Check warning on line 60 in datashader/glyphs/glyph.py

View check run for this annotation

Codecov / codecov/patch

datashader/glyphs/glyph.py#L60

Added line #L60 was not covered by tests
else:
return Glyph._compute_bounds_numba(s.values.ravel())
else:
return Glyph._compute_bounds_numba(s)

Expand Down
106 changes: 90 additions & 16 deletions datashader/glyphs/line.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from ..transfer_functions._cuda_utils import cuda_args
except ImportError:
cudf = None
cp = None
cuda_args = None

try:
Expand Down Expand Up @@ -607,7 +608,78 @@

perform_extend_cpu(
sx, tx, sy, ty, xmin, xmax, ymin, ymax,
geom_array, antialias_stage_2, *aggs_and_cols
geom_array, antialias_stage_2, *aggs_and_cols,
)

return extend


class LinesXarrayCommonX(LinesAxis1):
def __init__(self, x, y, x_dim_index: int):
super().__init__(x, y)
self.x_dim_index = x_dim_index

def __hash__(self):
# This ensures that @memoize below caches different functions for different x_dim_index.
return hash((type(self), self.x_dim_index))

def compute_x_bounds(self, dataset):
bounds = self._compute_bounds(dataset[self.x])
return self.maybe_expand_bounds(bounds)

def compute_y_bounds(self, dataset):
bounds = self._compute_bounds(dataset[self.y])
return self.maybe_expand_bounds(bounds)

def compute_bounds_dask(self, xr_ds):
return self.compute_x_bounds(xr_ds), self.compute_y_bounds(xr_ds)

def validate(self, in_dshape):
if not isreal(in_dshape.measure[str(self.x)]):
raise ValueError('x column must be real')

Check warning on line 639 in datashader/glyphs/line.py

View check run for this annotation

Codecov / codecov/patch

datashader/glyphs/line.py#L639

Added line #L639 was not covered by tests

if not isreal(in_dshape.measure[str(self.y)]):
raise ValueError('y column must be real')

Check warning on line 642 in datashader/glyphs/line.py

View check run for this annotation

Codecov / codecov/patch

datashader/glyphs/line.py#L642

Added line #L642 was not covered by tests

@memoize
def _internal_build_extend(
self, x_mapper, y_mapper, info, append, line_width, antialias_stage_2,
antialias_stage_2_funcs,
):
expand_aggs_and_cols = self.expand_aggs_and_cols(append)
draw_segment, antialias_stage_2_funcs = _line_internal_build_extend(
x_mapper, y_mapper, append, line_width, antialias_stage_2, antialias_stage_2_funcs,
expand_aggs_and_cols,
)
swap_dims = self.x_dim_index == 0
extend_cpu, extend_cuda = _build_extend_line_axis1_x_constant(
draw_segment, expand_aggs_and_cols, antialias_stage_2_funcs, swap_dims,
)

x_name = self.x
y_name = self.y

def extend(aggs, df, vt, bounds, plot_start=True):
sx, tx, sy, ty = vt
xmin, xmax, ymin, ymax = bounds
aggs_and_cols = aggs + info(df, aggs[0].shape[:2])

if cudf and isinstance(df, cudf.DataFrame):
xs = cp.asarray(df[x_name])
ys = cp.asarray(df[y_name])
do_extend = extend_cuda[cuda_args(ys.shape)]

Check warning on line 670 in datashader/glyphs/line.py

View check run for this annotation

Codecov / codecov/patch

datashader/glyphs/line.py#L668-L670

Added lines #L668 - L670 were not covered by tests
elif cp and isinstance(df[y_name].data, cp.ndarray):
xs = cp.asarray(df[x_name])
ys = df[y_name].data
shape = ys.shape[::-1] if swap_dims else ys.shape
do_extend = extend_cuda[cuda_args(shape)]

Check warning on line 675 in datashader/glyphs/line.py

View check run for this annotation

Codecov / codecov/patch

datashader/glyphs/line.py#L672-L675

Added lines #L672 - L675 were not covered by tests
else:
xs = df[x_name].to_numpy()
ys = df[y_name].to_numpy()
do_extend = extend_cpu

do_extend(
sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, antialias_stage_2, *aggs_and_cols
)

return extend
Expand Down Expand Up @@ -1263,7 +1335,7 @@


def _build_extend_line_axis1_x_constant(draw_segment, expand_aggs_and_cols,
antialias_stage_2_funcs):
antialias_stage_2_funcs, swap_dims: bool = False):
if antialias_stage_2_funcs is not None:
aa_stage_2_accumulate, aa_stage_2_clear, aa_stage_2_copy_back = antialias_stage_2_funcs
use_2_stage_agg = antialias_stage_2_funcs is not None
Expand All @@ -1274,22 +1346,24 @@
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols
):
x0 = xs[j]
y0 = ys[i, j]
x1 = xs[j + 1]
y1 = ys[i, j + 1]

segment_start = (
(j == 0) or isnull(xs[j - 1]) or isnull(ys[i, j - 1])
)

segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[i, j+2])
if swap_dims:
y0 = ys[j, i]
y1 = ys[j + 1, i]
segment_start = (j == 0) or isnull(xs[j - 1]) or isnull(ys[j - 1, i])
segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[j+2, i])
else:
y0 = ys[i, j]
y1 = ys[i, j + 1]
segment_start = (j == 0) or isnull(xs[j - 1]) or isnull(ys[i, j - 1])
segment_end = (j == len(xs)-2) or isnull(xs[j+2]) or isnull(ys[i, j+2])

if segment_start or use_2_stage_agg:
xm = 0.0
ym = 0.0
else:
xm = xs[j-1]
ym = ys[i, j-1]
ym = ys[j-1, i] if swap_dims else ys[i, j-1]

draw_segment(i, sx, tx, sy, ty, xmin, xmax, ymin, ymax,
segment_start, segment_end, x0, x1, y0, y1,
Expand All @@ -1301,8 +1375,8 @@
*aggs_and_cols):
antialias = antialias_stage_2 is not None
buffer = np.empty(8) if antialias else None
ncols = ys.shape[1]
for i in range(ys.shape[0]):
ncols, nrows = ys.shape if swap_dims else ys.shape[::-1]
for i in range(nrows):
for j in range(ncols - 1):
perform_extend_line(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols
Expand Down Expand Up @@ -1347,10 +1421,10 @@
antialias = antialias_stage_2 is not None
buffer = cuda.local.array(8, nb_types.float64) if antialias else None
i, j = cuda.grid(2)
if i < ys.shape[0] and j < ys.shape[1] - 1:
ncols, nrows = ys.shape if swap_dims else ys.shape[::-1]
if i < nrows and j < ncols - 1:

Check warning on line 1425 in datashader/glyphs/line.py

View check run for this annotation

Codecov / codecov/patch

datashader/glyphs/line.py#L1424-L1425

Added lines #L1424 - L1425 were not covered by tests
perform_extend_line(
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys,
buffer, *aggs_and_cols
i, j, sx, tx, sy, ty, xmin, xmax, ymin, ymax, xs, ys, buffer, *aggs_and_cols
)

if use_2_stage_agg:
Expand Down
Loading