Skip to content

Commit

Permalink
Where reduction using dataframe row index (#1164)
Browse files Browse the repository at this point in the history
* Basic row index implementation for pandas and dask dataframes

* Only calculate row offsets if needed

* Use column name of None to represent row index

* Use int64 for row index aggs

* Support where(first(...)) and where(last(...)) reductions

* Handle dask where first/last possibilities
  • Loading branch information
ianthomas23 authored Jan 19, 2023
1 parent e4ad67c commit 73d3deb
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 15 deletions.
15 changes: 15 additions & 0 deletions datashader/data_libraries/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ def default(glyph, df, schema, canvas, summary, *, antialias=False, cuda=False):
y_mapper = canvas.y_axis.mapper
extend = glyph._build_extend(x_mapper, y_mapper, info, append, antialias_stage_2)

if summary.uses_row_index() and isinstance(df, dd.DataFrame) and df.npartitions > 1:
def func(partition: pd.DataFrame, cumulative_lens, partition_info=None):
# This function is called once for each dask dataframe partition.
# It sets the _datashader_row_offset attribute so that row indexes
# can be calculated correctly in the reductions.extract class.
if partition_info is not None:
partition_index = partition_info["number"]
row_offset = cumulative_lengths[partition_index-1] if partition_index > 0 else 0
partition.attrs["_datashader_row_offset"] = row_offset
return partition

cumulative_lengths = df.map_partitions(len).compute().cumsum().to_numpy()
df = df.map_partitions(func, cumulative_lengths)

# Here be dragons
# Get the dataframe graph
graph = df.__dask_graph__()
Expand Down Expand Up @@ -126,6 +140,7 @@ def default(glyph, df, schema, canvas, summary, *, antialias=False, cuda=False):

def chunk(df, axis, keepdims):
""" used in the dask.array.reduction chunk step """
# df is a pandas.DataFrame computed from one dask.DataFrame partition
aggs = create(shape)
extend(aggs, df, st, bounds)
return aggs
Expand Down
82 changes: 71 additions & 11 deletions datashader/reductions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ def apply(self, df):
elif isinstance(df, xr.Dataset):
# DataArray could be backed by numpy or cupy array
return df[self.column].data
elif self.column is None:
row_index = df.attrs.get("_datashader_row_offset", 0)
return np.arange(row_index, row_index+len(df), dtype=np.int64)
else:
return df[self.column].values


class CategoryPreprocess(Preprocess):
"""Base class for categorizing preprocessors."""
@property
Expand Down Expand Up @@ -230,6 +234,9 @@ class Reduction(Expr):
def __init__(self, column=None):
self.column = column

def uses_row_index(self):
return False

def validate(self, in_dshape):
if not self.column in in_dshape.dict:
raise ValueError("specified column not found")
Expand Down Expand Up @@ -283,6 +290,8 @@ def _build_create(self, required_dshape):
return self._create_float32_nan
elif required_dshape == dshape(ct.float64):
return self._create_float64_nan
elif required_dshape == dshape(ct.int64):
return self._create_int64
elif required_dshape == dshape(ct.uint32):
return self._create_uint32
else:
Expand Down Expand Up @@ -334,6 +343,10 @@ def _create_float64_empty(shape, array_module):
def _create_float64_zero(shape, array_module):
return array_module.zeros(shape, dtype='f8')

@staticmethod
def _create_int64(shape, array_module):
return array_module.full(shape, -1, dtype='i8')

@staticmethod
def _create_uint32(shape, array_module):
return array_module.zeros(shape, dtype='u4')
Expand Down Expand Up @@ -1229,6 +1242,10 @@ class where(FloatingReduction):
Returns values from a ``lookup_column`` corresponding to a ``selector``
reduction that is applied to some other column.
If ``lookup_column`` is ``None`` then it uses the index of the row in the
DataFrame instead of a named column. This is returned as an int64
aggregation with -1 used to denote no value.
Example
-------
>>> canvas.line(df, 'x', 'y', agg=ds.where(ds.max("value"), "other")) # doctest: +SKIP
Expand All @@ -1242,13 +1259,13 @@ class where(FloatingReduction):
Reduction used to select the values of the ``lookup_column`` which are
returned by this ``where`` reduction.
lookup_column : str
lookup_column : str | None
Column containing values that are returned from this ``where``
reduction.
reduction, or ``None`` to return row indexes instead.
"""
def __init__(self, selector: Reduction, lookup_column: str):
if not isinstance(selector, (max, min)):
raise TypeError("selector can only be a max or min reduction")
def __init__(self, selector: Reduction, lookup_column: str | None=None):
if not isinstance(selector, (first, last, max, min)):
raise TypeError("selector can only be a first, last, max or min reduction")
super().__init__(lookup_column)
self.selector = selector
# List of all column names that this reduction uses.
Expand All @@ -1258,10 +1275,17 @@ def __hash__(self):
return hash((type(self), self._hashable_inputs(), self.selector))

def out_dshape(self, input_dshape, antialias):
return self.selector.out_dshape(input_dshape, antialias)
if self.uses_row_index():
return dshape(ct.int64)
else:
return dshape(ct.float64)

def uses_row_index(self):
return self.column is None

def validate(self, in_dshape):
super().validate(in_dshape)
if self.column is not None:
super().validate(in_dshape)
self.selector.validate(in_dshape)
if self.column is not None and self.column == self.selector.column:
raise ValueError("where and its contained reduction cannot use the same column")
Expand All @@ -1285,17 +1309,27 @@ def _append_antialias(x, y, agg, field, aa_factor):
def _build_append(self, dshape, schema, cuda, antialias, self_intersect):
if cuda:
raise NotImplementedError("where reduction not supported on CUDA")
return super()._build_append(dshape, schema, cuda, antialias, self_intersect)

# If self.column is None then append function still receives a 'field'
# argument which is the row index.
if antialias:
return self._append_antialias
else:
return self._append

def _build_bases(self, cuda=False):
return self.selector._build_bases(cuda=cuda) + super()._build_bases(cuda=cuda)

def _build_combine(self, dshape, antialias):
# Does not support categorical reductions or CUDA.
append = self.selector._append
selector = self.selector
uses_row_index = self.uses_row_index
append = selector._append

# combine functions are identical except for test to determine valid
# values. For floats: not isnull(value), for integers: value != -1.
@ngjit
def combine(aggs, selector_aggs):
def combine_float(aggs, selector_aggs):
if len(aggs) > 1:
ny, nx = aggs[0].shape
for y in range(ny):
Expand All @@ -1305,7 +1339,30 @@ def combine(aggs, selector_aggs):
aggs[0][y, x] = aggs[1][y, x]
return aggs[0], selector_aggs[0]

return combine
@ngjit
def combine_int(aggs, selector_aggs):
if len(aggs) > 1:
ny, nx = aggs[0].shape
for y in range(ny):
for x in range(nx):
value = selector_aggs[1][y, x]
if value != -1 and append(x, y, selector_aggs[0], value):
aggs[0][y, x] = aggs[1][y, x]
return aggs[0], selector_aggs[0]

def wrapped_combine(aggs, selector_aggs):
# Equivalent check to first._combine and last._combine
# When first and last are supported for dask, this function can be
# removed and combine_int/combine_float returned directly.
if isinstance(selector, (first, last)):
raise NotImplementedError("first and last are not implemented for dask DataFrames")

if uses_row_index:
return combine_int(aggs, selector_aggs)
else:
return combine_float(aggs, selector_aggs)

return wrapped_combine

def _build_combine_temps(self, cuda=False):
return (self.selector,)
Expand Down Expand Up @@ -1346,6 +1403,9 @@ def __init__(self, **kwargs):
def __hash__(self):
return hash((type(self), tuple(self.keys), tuple(self.values)))

def uses_row_index(self):
return any(v.uses_row_index() for v in self.values)

def validate(self, input_dshape):
for v in self.values:
v.validate(input_dshape)
Expand Down
14 changes: 14 additions & 0 deletions datashader/tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,13 @@ def test_where_max(ddf, npartitions):
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('f32'), 'reverse')), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('f64'), 'reverse')), out)

# Using row index.
out = xr.DataArray([[4, 14], [9, 19]], coords=coords, dims=dims)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('i32'))), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('i64'))), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('f64'))), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.max('f32'))), out)


@pytest.mark.parametrize('ddf',[_ddf])
@pytest.mark.parametrize('npartitions', [1, 2, 3, 4])
Expand All @@ -202,6 +209,13 @@ def test_where_min(ddf, npartitions):
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('f32'), 'reverse')), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('f64'), 'reverse')), out)

# Using row index.
out = xr.DataArray([[0, 10], [5, 15]], coords=coords, dims=dims)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('i32'))), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('i64'))), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('f64'))), out)
assert_eq_xr(c.points(ddf, 'x', 'y', ds.where(ds.min('f32'))), out)


@pytest.mark.skipif(not test_gpu, reason="DATASHADER_TEST_GPU not set")
def test_where_cuda():
Expand Down
58 changes: 54 additions & 4 deletions datashader/tests/test_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,42 @@ def test_max(df):
assert_eq_xr(c.points(df, 'x', 'y', ds.max('f64')), out)


@pytest.mark.parametrize('df', dfs_pd)
def test_where_first(df):
# Note reductions like ds.where(ds.first('i32'), 'reverse') are supported,
# but the same results can be achieved using the simpler ds.first('reverse')
out = xr.DataArray([[20, 10], [15, 5]], coords=coords, dims=dims)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('i32'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('i64'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('f32'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('f64'), 'reverse')), out)

# Using row index.
out = xr.DataArray([[0, 10], [5, 15]], coords=coords, dims=dims)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('i32'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('i64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('f64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.first('f32'))), out)


@pytest.mark.parametrize('df', dfs_pd)
def test_where_last(df):
# Note reductions like ds.where(ds.last('i32'), 'reverse') are supported,
# but the same results can be achieved using the simpler ds.last('reverse')
out = xr.DataArray([[16, 6], [11, 1]], coords=coords, dims=dims)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('i32'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('i64'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('f32'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('f64'), 'reverse')), out)

# Using row index.
out = xr.DataArray([[4, 14], [9, 19]], coords=coords, dims=dims)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('i32'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('i64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('f64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.last('f32'))), out)


@pytest.mark.parametrize('df', dfs_pd)
def test_where_max(df):
out = xr.DataArray([[16, 6], [11, 1]], coords=coords, dims=dims)
Expand All @@ -216,6 +252,13 @@ def test_where_max(df):
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('f32'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('f64'), 'reverse')), out)

# Using row index.
out = xr.DataArray([[4, 14], [9, 19]], coords=coords, dims=dims)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('i32'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('i64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('f64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.max('f32'))), out)


@pytest.mark.parametrize('df', dfs_pd)
def test_where_min(df):
Expand All @@ -225,6 +268,13 @@ def test_where_min(df):
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('f32'), 'reverse')), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('f64'), 'reverse')), out)

# Using row index.
out = xr.DataArray([[0, 10], [5, 15]], coords=coords, dims=dims)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('i32'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('i64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('f64'))), out)
assert_eq_xr(c.points(df, 'x', 'y', ds.where(ds.min('f32'))), out)


@pytest.mark.skipif(not test_gpu, reason="DATASHADER_TEST_GPU not set")
def test_where_cuda():
Expand Down Expand Up @@ -2268,10 +2318,12 @@ def test_line_antialias_where():
(ds.max("value"), np.float64, np.float64),
(ds.min("value"), np.float64, np.float64),
(ds.sum("value"), np.float64, np.float64),
(ds.where(ds.max("value")), np.int64, np.int64),
(ds.where(ds.max("value"), "other"), np.float64, np.float64),
])
def test_reduction_dtype(reduction, dtype, aa_dtype):
cvs = ds.Canvas(plot_width=10, plot_height=10)
df = pd.DataFrame(dict(x=[0, 1], y=[1, 2], value=[1, 2]))
df = pd.DataFrame(dict(x=[0, 1], y=[1, 2], value=[1, 2], other=[1.2, 3.4]))

# Non-antialiased lines
agg = cvs.line(df, 'x', 'y', line_width=0, agg=reduction)
Expand All @@ -2297,8 +2349,6 @@ def test_log_axis_not_positive(df, canvas):
@pytest.mark.parametrize('selector', [
ds.any(),
ds.count(),
ds.first('value'),
ds.last('value'),
ds.mean('value'),
ds.std('value'),
ds.sum('value'),
Expand All @@ -2310,7 +2360,7 @@ def test_where_unsupported_selector(selector):
cvs = ds.Canvas(plot_width=10, plot_height=10)
df = pd.DataFrame(dict(x=[0, 1], y=[1, 2], value=[1, 2], ))

with pytest.raises(TypeError, match='selector can only be a max or min reduction'):
with pytest.raises(TypeError, match='selector can only be a first, last, max or min reduction'):
cvs.line(df, 'x', 'y', agg=ds.where(selector, 'value'))


Expand Down

0 comments on commit 73d3deb

Please sign in to comment.