Skip to content

Commit

Permalink
Merge branch 'main' into multiple-groupers-3
Browse files Browse the repository at this point in the history
* main:
  More consistent fill_value handling.
  isnull instead of isnan
  Add funding badge (#77)
  • Loading branch information
dcherian committed Mar 15, 2022
2 parents af86cea + 35dd38d commit 398a6fb
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 68 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![GitHub Workflow CI Status](https://img.shields.io/github/workflow/status/dcherian/flox/CI?logo=github&style=flat)](https://github.com/dcherian/flox/actions)[![pre-commit.ci status](https://results.pre-commit.ci/badge/github/dcherian/flox/main.svg)](https://results.pre-commit.ci/latest/github/dcherian/flox/main)[![image](https://img.shields.io/codecov/c/github/dcherian/flox.svg?style=flat)](https://codecov.io/gh/dcherian/flox)[![PyPI](https://img.shields.io/pypi/v/flox.svg?style=flat)](https://pypi.org/project/flox/)[![Conda-forge](https://img.shields.io/conda/vn/conda-forge/flox.svg?style=flat)](https://anaconda.org/conda-forge/flox)[![Documentation Status](https://readthedocs.org/projects/flox/badge/?version=latest)](https://flox.readthedocs.io/en/latest/?badge=latest)
[![GitHub Workflow CI Status](https://img.shields.io/github/workflow/status/dcherian/flox/CI?logo=github&style=flat)](https://github.com/dcherian/flox/actions)[![pre-commit.ci status](https://results.pre-commit.ci/badge/github/dcherian/flox/main.svg)](https://results.pre-commit.ci/latest/github/dcherian/flox/main)[![image](https://img.shields.io/codecov/c/github/dcherian/flox.svg?style=flat)](https://codecov.io/gh/dcherian/flox)[![PyPI](https://img.shields.io/pypi/v/flox.svg?style=flat)](https://pypi.org/project/flox/)[![Conda-forge](https://img.shields.io/conda/vn/conda-forge/flox.svg?style=flat)](https://anaconda.org/conda-forge/flox)[![Documentation Status](https://readthedocs.org/projects/flox/badge/?version=latest)](https://flox.readthedocs.io/en/latest/?badge=latest)[![NASA-80NSSC18M0156](https://img.shields.io/badge/NASA-80NSSC18M0156-blue)](https://earthdata.nasa.gov/esds/competitive-programs/access/pangeo-ml)

# flox

Expand Down
22 changes: 22 additions & 0 deletions flox/aggregate_npg.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from functools import partial

import numpy as np
import numpy_groupies as npg

Expand Down Expand Up @@ -64,3 +66,23 @@ def nansum_of_squares(group_idx, array, engine, *, axis=-1, size=None, fill_valu
axis=axis,
dtype=dtype,
)


def _len(group_idx, array, engine, *, func, axis=-1, size=None, fill_value=None, dtype=None):
result = _get_aggregate(engine).aggregate(
group_idx,
array,
axis=axis,
func=func,
size=size,
fill_value=0,
dtype=np.int64,
)
if fill_value is not None:
result = result.astype(np.array([fill_value]).dtype)
result[result == 0] = fill_value
return result


len = partial(_len, func="len")
nanlen = partial(_len, func="nanlen")
39 changes: 35 additions & 4 deletions flox/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __init__(
self.preprocess = preprocess
# Use "chunk_reduce" or "chunk_argreduce"
self.reduction_type = reduction_type
self.numpy = numpy if numpy else self.name
self.numpy = (numpy,) if numpy else (self.name,)
# initialize blockwise reduction
self.chunk = _atleast_1d(chunk)
# how to aggregate results after first round of reduction
Expand Down Expand Up @@ -163,6 +163,7 @@ def __repr__(self):
f"combine: {self.combine}",
f"aggregate: {self.aggregate}",
f"finalize: {self.finalize}",
f"min_count: {self.min_count}",
)
)

Expand Down Expand Up @@ -265,9 +266,9 @@ def _std_finalize(sumsq, sum_, count, ddof=0):


min_ = Aggregation("min", chunk="min", combine="min", fill_value=dtypes.INF)
nanmin = Aggregation("nanmin", chunk="nanmin", combine="min", fill_value=dtypes.INF)
nanmin = Aggregation("nanmin", chunk="nanmin", combine="nanmin", fill_value=np.nan)
max_ = Aggregation("max", chunk="max", combine="max", fill_value=dtypes.NINF)
nanmax = Aggregation("nanmax", chunk="nanmax", combine="max", fill_value=dtypes.NINF)
nanmax = Aggregation("nanmax", chunk="nanmax", combine="nanmax", fill_value=np.nan)


def argreduce_preprocess(array, axis):
Expand Down Expand Up @@ -409,7 +410,13 @@ def _zip_index(array_, idx_):
}


def _initialize_aggregation(func: str | Aggregation, array_dtype, fill_value) -> Aggregation:
def _initialize_aggregation(
func: str | Aggregation,
array_dtype,
fill_value,
min_count: int,
finalize_kwargs,
) -> Aggregation:
if not isinstance(func, Aggregation):
try:
# TODO: need better interface
Expand All @@ -425,6 +432,7 @@ def _initialize_aggregation(func: str | Aggregation, array_dtype, fill_value) ->
raise ValueError("Bad type for func. Expected str or Aggregation")

agg.dtype[func] = _normalize_dtype(agg.dtype[func], array_dtype, fill_value)
agg.dtype["numpy"] = (agg.dtype[func],)
agg.dtype["intermediate"] = [
_normalize_dtype(dtype, array_dtype) for dtype in agg.dtype["intermediate"]
]
Expand All @@ -435,4 +443,27 @@ def _initialize_aggregation(func: str | Aggregation, array_dtype, fill_value) ->
for dt, fv in zip(agg.dtype["intermediate"], agg.fill_value["intermediate"])
)
agg.fill_value[func] = _get_fill_value(agg.dtype[func], agg.fill_value[func])

fv = fill_value if fill_value is not None else agg.fill_value[agg.name]
agg.fill_value["numpy"] = (fv,)

if finalize_kwargs is not None:
assert isinstance(finalize_kwargs, dict)
agg.finalize_kwargs = finalize_kwargs

# This is needed for the dask pathway.
# Because we use intermediate fill_value since a group could be
# absent in one block, but present in another block
# We set it for numpy to get nansum, nanprod tests to pass
# where the identity element is 0, 1
if min_count is not None:
agg.min_count = min_count
agg.chunk += ("nanlen",)
agg.numpy += ("nanlen",)
agg.combine += ("sum",)
agg.fill_value["intermediate"] += (0,)
agg.fill_value["numpy"] += (0,)
agg.dtype["intermediate"] += (np.intp,)
agg.dtype["numpy"] += (np.intp,)

return agg
83 changes: 34 additions & 49 deletions flox/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def find_group_cohorts(labels, chunks, merge=True, method="cohorts"):
labels = np.asarray(labels)

if method == "split-reduce":
return pd.unique(labels.ravel()).reshape(-1, 1).tolist()
return _get_expected_groups(labels, sort=False).values.reshape(-1, 1).tolist()

# Build an array with the shape of labels, but where every element is the "chunk number"
# 1. First subset the array appropriately
Expand All @@ -180,7 +180,7 @@ def find_group_cohorts(labels, chunks, merge=True, method="cohorts"):
# We always drop NaN; np.unique also considers every NaN to be different so
# it's really important we get rid of them.
raveled = labels.ravel()
unique_labels = np.unique(raveled[~np.isnan(raveled)])
unique_labels = np.unique(raveled[~isnull(raveled)])
# these are chunks where a label is present
label_chunks = {lab: tuple(np.unique(which_chunk[raveled == lab])) for lab in unique_labels}
# These invert the label_chunks mapping so we know which labels occur together.
Expand Down Expand Up @@ -361,7 +361,7 @@ def reindex_(
raise ValueError("Filling is required. fill_value cannot be None.")
indexer[axis] = idx == -1
# This allows us to match xarray's type promotion rules
if fill_value is xrdtypes.NA or np.isnan(fill_value):
if fill_value is xrdtypes.NA or isnull(fill_value):
new_dtype, fill_value = xrdtypes.maybe_promote(reindexed.dtype)
reindexed = reindexed.astype(new_dtype, copy=False)
reindexed[tuple(indexer)] = fill_value
Expand Down Expand Up @@ -429,7 +429,7 @@ def factorize_(
else:
sorter = None
idx = np.searchsorted(expect, groupvar.ravel(), sorter=sorter)
mask = np.isnan(groupvar.ravel())
mask = isnull(groupvar.ravel())
# TODO: optimize?
idx[mask] = -1
if not sort:
Expand Down Expand Up @@ -510,7 +510,7 @@ def chunk_argreduce(
engine=engine,
sort=sort,
)
if not np.isnan(results["groups"]).all():
if not isnull(results["groups"]).all():
# will not work for empty groups...
# glorious
idx = np.broadcast_to(idx, array.shape)
Expand Down Expand Up @@ -639,6 +639,8 @@ def chunk_reduce(
# counts are needed for the final result as well as for masking
# optimize that out.
previous_reduction = None
for param in (fill_value, kwargs, dtype):
assert len(param) >= len(func)
for reduction, fv, kw, dt in zip(func, fill_value, kwargs, dtype):
if empty:
result = np.full(shape=final_array_shape, fill_value=fv)
Expand Down Expand Up @@ -842,7 +844,7 @@ def _grouped_combine(
# reindexing is unnecessary
# I bet we can minimize the amount of reindexing for mD reductions too, but it's complicated
unique_groups = np.unique(tuple(flatten(deepmap(listify_groups, x_chunk))))
unique_groups = unique_groups[~np.isnan(unique_groups)]
unique_groups = unique_groups[~isnull(unique_groups)]
if len(unique_groups) == 0:
unique_groups = [np.nan]

Expand Down Expand Up @@ -962,13 +964,10 @@ def _reduce_blockwise(array, by, agg, *, axis, expected_groups, fill_value, engi
Blockwise groupby reduction that produces the final result. This code path is
also used for non-dask array aggregations.
"""

# for pure numpy grouping, we just use npg directly and avoid "finalizing"
# (agg.finalize = None). We still need to do the reindexing step in finalize
# so that everything matches the dask version.
agg.finalize = None
# xarray's count is npg's nanlen
func: tuple[str] = (agg.numpy, "nanlen")

assert agg.finalize_kwargs is not None
finalize_kwargs = agg.finalize_kwargs
Expand All @@ -979,14 +978,14 @@ def _reduce_blockwise(array, by, agg, *, axis, expected_groups, fill_value, engi
results = chunk_reduce(
array,
by,
func=func,
func=agg.numpy,
axis=axis,
expected_groups=expected_groups,
# This fill_value should only apply to groups that only contain NaN observations
# BUT there is funkiness when axis is a subset of all possible values
# (see below)
fill_value=(agg.fill_value[agg.name], 0),
dtype=(agg.dtype[agg.name], np.intp),
fill_value=agg.fill_value["numpy"],
dtype=agg.dtype["numpy"],
kwargs=finalize_kwargs,
engine=engine,
sort=sort,
Expand All @@ -998,36 +997,20 @@ def _reduce_blockwise(array, by, agg, *, axis, expected_groups, fill_value, engi
# so replace -1 with 0; unravel; then replace 0 with -1
# UGH!
idx = results["intermediates"][0]
mask = idx == -1
mask = idx == agg.fill_value["numpy"][0]
idx[mask] = 0
# Fix npg bug where argmax with nD array, 1D group_idx, axis=-1
# will return wrong indices
idx = np.unravel_index(idx, array.shape)[-1]
idx[mask] = -1
idx[mask] = agg.fill_value["numpy"][0]
results["intermediates"][0] = idx
elif agg.name in ["nanvar", "nanstd"]:
# Fix npg bug where all-NaN rows are 0 instead of NaN
# TODO: Fix npg bug where all-NaN rows are 0 instead of NaN
value, counts = results["intermediates"]
mask = counts <= 0
value[mask] = np.nan
results["intermediates"][0] = value

# When axis is a subset of possible values; then npg will
# apply it to groups that don't exist along a particular axis (for e.g.)
# since these count as a group that is absent. thoo!
# TODO: the "count" bit is a hack to make tests pass.
if len(axis) < by.ndim and agg.min_count is None and agg.name != "count":
agg.min_count = 1

# This fill_value applies to members of expected_groups not seen in groups
# or when the min_count threshold is not satisfied
# Use xarray's dtypes.NA to match type promotion rules
if fill_value is None:
if agg.name in ["any", "all"]:
fill_value = False
elif not _is_arg_reduction(agg):
fill_value = xrdtypes.NA

result = _finalize_results(results, agg, axis, expected_groups, fill_value=fill_value)
return result

Expand Down Expand Up @@ -1519,20 +1502,33 @@ def groupby_reduce(
array = _move_reduce_dims_to_end(array, axis)
axis = tuple(array.ndim + np.arange(-len(axis), 0))

has_dask = is_duck_dask_array(array) or is_duck_dask_array(by)

# When axis is a subset of possible values; then npg will
# apply it to groups that don't exist along a particular axis (for e.g.)
# since these count as a group that is absent. thoo!
# fill_value applies to all-NaN groups as well as labels in expected_groups that are not found.
# The only way to do this consistently is mask out using min_count
# Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0
if min_count is None:
if (
len(axis) < by.ndim
or fill_value is not None
# TODO: Fix npg bug where all-NaN rows are 0 instead of NaN
or (not has_dask and isinstance(func, str) and func in ["nanvar", "nanstd"])
):
min_count = 1

# TODO: set in xarray?
if min_count is not None and func in ["nansum", "nanprod"] and fill_value is None:
# nansum, nanprod have fill_value=0, 1
# overwrite than when min_count is set
fill_value = np.nan

agg = _initialize_aggregation(func, array.dtype, fill_value)
agg.min_count = min_count
if finalize_kwargs is not None:
assert isinstance(finalize_kwargs, dict)
agg.finalize_kwargs = finalize_kwargs

kwargs = dict(axis=axis, fill_value=fill_value, engine=engine, sort=sort)
agg = _initialize_aggregation(func, array.dtype, fill_value, min_count, finalize_kwargs)

if not is_duck_dask_array(array) and not is_duck_dask_array(by):
if not has_dask:
results = _reduce_blockwise(array, by, agg, expected_groups=expected_groups, **kwargs)
groups = (results["groups"],)
result = results[agg.name]
Expand All @@ -1541,21 +1537,10 @@ def groupby_reduce(
if agg.chunk is None:
raise NotImplementedError(f"{func} not implemented for dask arrays")

if agg.min_count is None:
# This is needed for the dask pathway.
# Because we use intermediate fill_value since a group could be
# absent in one block, but present in another block
agg.min_count = 1

# we always need some fill_value (see above) so choose the default if needed
if kwargs["fill_value"] is None:
kwargs["fill_value"] = agg.fill_value[agg.name]

agg.chunk += ("nanlen",)
agg.combine += ("sum",)
agg.fill_value["intermediate"] += (0,)
agg.dtype["intermediate"] += (np.intp,)

partial_agg = partial(dask_groupby_agg, agg=agg, split_out=split_out, **kwargs)

if method in ["split-reduce", "cohorts"]:
Expand Down
Loading

0 comments on commit 398a6fb

Please sign in to comment.