From 7991c5abca2fad03256bb54b2cd438c836141f13 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 17 Feb 2022 09:35:34 -0700 Subject: [PATCH 01/16] restrict tests --- tests/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/__init__.py b/tests/__init__.py index fef0a778e..53f806972 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -101,7 +101,7 @@ def assert_equal(a, b): np.testing.assert_allclose(a, b, equal_nan=True) -@pytest.fixture(scope="module", params=["flox", "numpy", "numba"]) +@pytest.fixture(scope="module", params=["flox"]) def engine(request): if request.param == "numba": try: From 97972fb70530efd39dc42bb406e8ecf1dc30d984 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 17 Feb 2022 09:38:16 -0700 Subject: [PATCH 02/16] xarray changes --- flox/xarray.py | 98 ++++++++++++++++++-------------------------- tests/test_xarray.py | 1 + 2 files changed, 42 insertions(+), 57 deletions(-) diff --git a/flox/xarray.py b/flox/xarray.py index 842d22f5d..341365d14 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -9,11 +9,9 @@ from .aggregations import Aggregation, _atleast_1d from .core import ( - factorize_, groupby_reduce, rechunk_for_blockwise as rechunk_array_for_blockwise, rechunk_for_cohorts as rechunk_array_for_cohorts, - reindex_, ) from .xrutils import is_duck_dask_array, isnull @@ -21,14 +19,14 @@ from xarray import DataArray, Dataset, Resample -def _get_input_core_dims(group_names, dim, ds, to_group): +def _get_input_core_dims(group_names, dim, ds, grouper_dims): input_core_dims = [[], []] for g in group_names: if g in dim: continue if g in ds.dims: input_core_dims[0].extend([g]) - if g in to_group.dims: + if g in grouper_dims: input_core_dims[1].extend([g]) input_core_dims[0].extend(dim) input_core_dims[1].extend(dim) @@ -182,6 +180,13 @@ def xarray_reduce( if isinstance(isbin, bool): isbin = (isbin,) * len(by) + if expected_groups is None: + expected_groups = (None,) * len(by) + if isinstance(expected_groups, (np.ndarray, list)): # TODO: test for list + if len(by) == 1: + expected_groups = (expected_groups,) + else: + raise ValueError("Needs better message.") if not sort: raise NotImplementedError @@ -196,10 +201,11 @@ def xarray_reduce( by: tuple[DataArray] = tuple(obj[g] if isinstance(g, str) else g for g in by) # type: ignore + # TODO: delete if len(by) > 1 and any(is_duck_dask_array(by_.data) for by_ in by): raise NotImplementedError("Grouping by multiple variables will compute dask variables.") - grouper_dims = set(itertools.chain(*tuple(g.dims for g in by))) + grouper_dims = tuple(itertools.chain(*tuple(g.dims for g in by))) if isinstance(obj, xr.DataArray): ds = obj._to_temp_dataset() @@ -222,7 +228,7 @@ def xarray_reduce( # in the case where dim is Ellipsis, and by.ndim < obj.ndim # then we also broadcast `by` to all `obj.dims` # TODO: avoid this broadcasting - exclude_dims = set(ds.dims) - grouper_dims + exclude_dims = set(ds.dims) - set(grouper_dims) if dim is not None: exclude_dims -= set(dim) ds, *by = xr.broadcast(ds, *by, exclude=exclude_dims) @@ -254,42 +260,34 @@ def xarray_reduce( axis = tuple(range(-len(dim), 0)) group_names = tuple(g.name if not binned else f"{g.name}_bins" for g, binned in zip(by, isbin)) - if len(by) > 1: - group_idx, expected_groups, group_shape, _, _, _ = factorize_( - tuple(g.data for g in by), - axis, - expected_groups, - ) - to_group = xr.DataArray(group_idx, dims=dim, coords={d: by[0][d] for d in by[0].indexes}) - else: - if expected_groups is None and isinstance(by[0].data, np.ndarray): - uniques = np.unique(by[0].data) - nans = isnull(uniques) - if nans.any(): - uniques = uniques[~nans] - expected_groups = (uniques,) - if expected_groups is None: + group_shape = [None] * len(by) + expected_groups = list(expected_groups) + for idx, (b, expect, isbin_) in enumerate(zip(by, expected_groups, isbin)): + if expect is None and is_duck_dask_array(b.data): raise NotImplementedError( "Please provide expected_groups if not grouping by a numpy-backed DataArray" ) - if isinstance(expected_groups, np.ndarray): - expected_groups = (expected_groups,) - if isbin[0]: - if isinstance(expected_groups[0], int): + if not isbin_: + uniques = np.unique(b.data) + nans = isnull(uniques) + if nans.any(): + uniques = uniques[~nans] + expected_groups[idx] = uniques + group_shape[idx] = len(uniques) + else: + if isinstance(expect, int): raise NotImplementedError( - "Does not support binning into an integer number of bins yet." + "flox does not support binning into an integer number of bins yet." ) # factorized, bins = pd.cut(by[0], bins=expected_groups[0], retbins=True) - group_shape = (expected_groups[0],) + group_shape[idx] = expect else: - group_shape = (len(expected_groups[0]) - 1,) - else: - group_shape = (len(expected_groups[0]),) - to_group = by[0] + # nbins - 1 elements since expect provides the bin edges + group_shape[idx] = len(expect) - 1 group_sizes = dict(zip(group_names, group_shape)) - def wrapper(array, to_group, *, func, skipna, **kwargs): + def wrapper(array, *by, func, skipna, **kwargs): # Handle skipna here because I need to know dtype to make a good default choice. # We cannnot handle this easily for xarray Datasets in xarray_reduce if skipna and func in ["all", "any", "count"]: @@ -299,19 +297,7 @@ def wrapper(array, to_group, *, func, skipna, **kwargs): if "nan" not in func and func not in ["all", "any", "count"]: func = f"nan{func}" - result, groups = groupby_reduce(array, to_group, func=func, **kwargs) - if len(by) > 1: - # all groups need not be present. reindex here - # TODO: add test - reindexed = reindex_( - result, - from_=groups, - to=pd.Index(np.arange(np.prod(group_shape))), - fill_value=fill_value, - axis=-1, - ) - result = reindexed.reshape(result.shape[:-1] + group_shape) - + result, *groups = groupby_reduce(array, *by, func=func, **kwargs) return result # These data variables do not have any of the core dimension, @@ -327,11 +313,13 @@ def wrapper(array, to_group, *, func, skipna, **kwargs): if is_missing_dim: missing_dim[k] = v - input_core_dims = _get_input_core_dims(group_names, dim, ds, to_group) + input_core_dims = _get_input_core_dims(group_names, dim, ds, grouper_dims) + input_core_dims += [input_core_dims[-1]] * (len(by) - 1) + actual = xr.apply_ufunc( wrapper, - ds.drop_vars(tuple(missing_dim) + bad_dtypes).transpose(..., *to_group.dims), - to_group, + ds.drop_vars(tuple(missing_dim) + bad_dtypes).transpose(..., *grouper_dims), + *by, input_core_dims=input_core_dims, # for xarray's test_groupby_duplicate_coordinate_labels exclude_dims=set(dim), @@ -350,14 +338,8 @@ def wrapper(array, to_group, *, func, skipna, **kwargs): "skipna": skipna, "engine": engine, "reindex": reindex, - # The following mess exists because for multiple `by`s I factorize eagerly - # here before passing it on; this means I have to handle the - # "binning by single by variable" case explicitly where the factorization - # happens later allowing `by` to be a dask variable. - # Another annoyance is that for resampling expected_groups is "disconnected" - # from "by" so we need the isbin part of the condition - "expected_groups": expected_groups[0] if len(by) == 1 and isbin[0] else None, - "isbin": isbin[0] if len(by) == 1 else False, + "expected_groups": tuple(expected_groups), + "isbin": isbin, "finalize_kwargs": finalize_kwargs, }, ) @@ -369,6 +351,8 @@ def wrapper(array, to_group, *, func, skipna, **kwargs): actual[var] = ds[var] for name, expect, isbin_ in zip(group_names, expected_groups, isbin): + # Can't remove this till I figure out how to return groups from wrapper + # without broadcasting if isbin_: expect = [pd.Interval(left, right) for left, right in zip(expect[:-1], expect[1:])] if isinstance(actual, xr.Dataset) and name in actual: @@ -525,11 +509,11 @@ def resample_reduce( by, func=func, method="blockwise", - expected_groups=(resampler._unique_coord.data,), keep_attrs=keep_attrs, **kwargs, ) .rename({"__resample_dim__": dim}) .transpose(dim, ...) ) + result[dim] = resampler._unique_coord.data return result diff --git a/tests/test_xarray.py b/tests/test_xarray.py index e33a7203e..5f314fe08 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -73,6 +73,7 @@ def test_xarray_reduce(skipna, add_nan, min_count, engine, reindex): # assert_equal(expected, actual) +# TODO: sort def test_xarray_reduce_multiple_groupers(engine): arr = np.ones((4, 12)) From 0db44f737e645a8aa915302fc1934e9ca58e23d5 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 17 Feb 2022 10:02:08 -0700 Subject: [PATCH 03/16] groupby_reduce updates --- flox/core.py | 35 +++++++++++++++++++++++------------ tests/test_core.py | 4 ++-- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/flox/core.py b/flox/core.py index 2da33171f..07deefc39 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1261,14 +1261,15 @@ def _validate_reindex(reindex: bool, func, method, expected_groups) -> bool: def _assert_by_is_aligned(shape, by): - if shape[-by.ndim :] != by.shape: - raise ValueError( - "`array` and `by` arrays must be aligned " - "i.e. array.shape[-by.ndim :] == by.shape. " - "for every array in `by`." - f"Received array of shape {shape} but " - f"`by` has shape {by.shape}." - ) + for idx, b in enumerate(by): + if shape[-b.ndim :] != b.shape: + raise ValueError( + "`array` and `by` arrays must be aligned " + "i.e. array.shape[-by.ndim :] == by.shape. " + "for every array in `by`." + f"Received array of shape {shape} but " + f"array {idx} in `by` has shape {b.shape}." + ) def _convert_expected_groups_to_index(expected_groups, isbin: bool) -> pd.Index | None: @@ -1400,13 +1401,22 @@ def groupby_reduce( ) reindex = _validate_reindex(reindex, func, method, expected_groups) - if not is_duck_array(by): - by = np.asarray(by) + by: tuple = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) + by_is_dask = any(is_duck_dask_array(b) for b in by) if not is_duck_array(array): array = np.asarray(array) + if isinstance(isbin, bool): + isbin = (isbin,) * len(by) + if expected_groups is None: + expected_groups = (None,) * len(by) _assert_by_is_aligned(array.shape, by) + if len(by) == 1 and not isinstance(expected_groups, tuple): + expected_groups = (np.asarray(expected_groups),) + elif len(expected_groups) != len(by): + raise ValueError("len(expected_groups) != len(by)") + # We convert to pd.Index since that lets us know if we are binning or not # (pd.IntervalIndex or not) expected_groups = _convert_expected_groups_to_index(expected_groups, isbin) @@ -1424,8 +1434,9 @@ def groupby_reduce( ) # TODO: make sure expected_groups is unique - if len(axis) == 1 and by.ndim > 1 and expected_groups is None: - if not is_duck_dask_array(by): + if len(axis) == 1 and by_ndim > 1 and expected_groups[0] is None: + # TODO: hack + if not by_is_dask: expected_groups = _get_expected_groups(by, sort) else: # When we reduce along all axes, we are guaranteed to see all diff --git a/tests/test_core.py b/tests/test_core.py index 0c2cc6e48..7773883fa 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -114,7 +114,7 @@ def test_groupby_reduce( elif func == "count": expected = np.array(expected, dtype=int) - result, groups = groupby_reduce( + result, groups, = groupby_reduce( array, by, func=func, @@ -780,7 +780,7 @@ def test_datetime_binning(): time_bins = pd.date_range(start="2010-08-01", end="2010-08-15", freq="24H") by = pd.date_range("2010-08-01", "2010-08-15", freq="15min") - actual = _convert_expected_groups_to_index(time_bins, isbin=True) + (actual,) = _convert_expected_groups_to_index((time_bins,), isbin=(True,)) expected = pd.IntervalIndex.from_arrays(time_bins[:-1], time_bins[1:]) assert_equal(actual, expected) From b2be395a80473a89d3809bb390d0f468c744bb94 Mon Sep 17 00:00:00 2001 From: dcherian Date: Thu, 17 Feb 2022 09:43:52 -0700 Subject: [PATCH 04/16] WIP --- flox/core.py | 97 +++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 80 insertions(+), 17 deletions(-) diff --git a/flox/core.py b/flox/core.py index 07deefc39..586fc308e 100644 --- a/flox/core.py +++ b/flox/core.py @@ -68,7 +68,7 @@ def _get_expected_groups(by, sort, raise_if_dask=True) -> pd.Index | None: expected = pd.unique(flatby[~isnull(flatby)]) if sort: expected = np.sort(expected) - return _convert_expected_groups_to_index(expected, isbin=False) + return _convert_expected_groups_to_index((expected,), isbin=(False,))[0] def _get_chunk_reduction(reduction_type: str) -> Callable: @@ -388,7 +388,12 @@ def offset_labels(labels: np.ndarray, ngroups: int) -> tuple[np.ndarray, int]: def factorize_( - by: tuple, axis, expected_groups: tuple[pd.Index, ...] = None, reindex=False, sort=True + by: tuple, + axis, + expected_groups: tuple[pd.Index, ...] = None, + reindex=False, + sort=True, + fastpath=False, ): """ Returns an array of integer codes for groups (and associated data) @@ -440,10 +445,13 @@ def factorize_( grp_shape = tuple(len(grp) for grp in found_groups) ngroups = np.prod(grp_shape) if len(by) > 1: - group_idx = np.ravel_multi_index(factorized, grp_shape).reshape(by[0].shape) + group_idx = np.ravel_multi_index(factorized, grp_shape) else: group_idx = factorized[0] + if fastpath: + return group_idx, found_groups, grp_shape + if np.isscalar(axis) and groupvar.ndim > 1: # Not reducing along all dimensions of by # this is OK because for 3D by and axis=(1,2), @@ -1272,23 +1280,60 @@ def _assert_by_is_aligned(shape, by): ) -def _convert_expected_groups_to_index(expected_groups, isbin: bool) -> pd.Index | None: - if isinstance(expected_groups, pd.IntervalIndex) or ( - isinstance(expected_groups, pd.Index) and not isbin - ): - return expected_groups - if isbin: - return pd.IntervalIndex.from_arrays(expected_groups[:-1], expected_groups[1:]) - elif expected_groups is not None: - return pd.Index(expected_groups) - return expected_groups +def _convert_expected_groups_to_index(expected_groups: tuple, isbin: bool) -> pd.Index | None: + out = [] + for ex, isbin_ in zip(expected_groups, isbin): + if isinstance(ex, pd.IntervalIndex) or (isinstance(ex, pd.Index) and not isbin): + out.append(expected_groups) + elif ex is not None: + if isbin_: + out.append(pd.IntervalIndex.from_arrays(ex[:-1], ex[1:])) + else: + out.append(pd.Index(ex)) + else: + assert ex is None + out.append(None) + return tuple(out) + + +def _lazy_factorize_wrapper(*by, **kwargs): + group_idx, _ = factorize_(by, **kwargs) + return group_idx + + +def _factorize_multiple(by, expected_groups, by_is_dask): + kwargs = dict( + expected_groups=expected_groups, + axis=None, # always None, we offset later if necessary. + fastpath=True, + ) + if by_is_dask: + import dask.array + + group_idx = dask.array.map_blocks( + _lazy_factorize_wrapper, + *np.broadcast_arrays(*by), + meta=np.array((), dtype=np.int64), + **kwargs, + ) + found_groups = tuple(None if is_duck_dask_array(b) else np.unique(b) for b in by) + else: + group_idx, found_groups, grp_shape = factorize_(by, **kwargs) + + final_groups = tuple( + pd.Index(found) if expect is None else expect + for found, expect in zip(found_groups, expected_groups) + ) + + if any(grp is None for grp in final_groups): + raise + return (group_idx,), final_groups, grp_shape def groupby_reduce( array: np.ndarray | DaskArray, - by: np.ndarray | DaskArray, + *by: np.ndarray | DaskArray, func: str | Aggregation, - *, expected_groups: Sequence | np.ndarray | None = None, sort: bool = True, isbin: bool = False, @@ -1402,6 +1447,7 @@ def groupby_reduce( reindex = _validate_reindex(reindex, func, method, expected_groups) by: tuple = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by) + nby = len(by) by_is_dask = any(is_duck_dask_array(b) for b in by) if not is_duck_array(array): array = np.asarray(array) @@ -1423,6 +1469,20 @@ def groupby_reduce( if expected_groups is not None and sort: expected_groups = expected_groups.sort_values() + # when grouping by multiple variables, we factorize early. + # TODO: could restrict this to dask-only + if nby > 1: + by, final_groups, grp_shape = _factorize_multiple( + by, expected_groups, by_is_dask=by_is_dask + ) + expected_groups = (pd.RangeIndex(np.prod(grp_shape)),) + else: + final_groups = expected_groups + + assert len(by) == 1 + by = by[0] + expected_groups = expected_groups[0] + if axis is None: axis = tuple(array.ndim + np.arange(-by.ndim, 0)) else: @@ -1434,7 +1494,7 @@ def groupby_reduce( ) # TODO: make sure expected_groups is unique - if len(axis) == 1 and by_ndim > 1 and expected_groups[0] is None: + if len(axis) == 1 and by.ndim > 1 and expected_groups is None: # TODO: hack if not by_is_dask: expected_groups = _get_expected_groups(by, sort) @@ -1540,7 +1600,7 @@ def groupby_reduce( result, *groups = partial_agg( array, by, - expected_groups=expected_groups, + expected_groups=None if method == "blockwise" else expected_groups, reindex=reindex, method=method, sort=sort, @@ -1552,4 +1612,7 @@ def groupby_reduce( result = result[..., sorted_idx] groups = (groups[0][sorted_idx],) + if nby > 1: + groups = final_groups + result = result.reshape(result.shape[:-1] + grp_shape) return (result, *groups) From e24645d215c9f2d9db59c4c6303ccaf28f8b9d61 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 11:17:42 -0700 Subject: [PATCH 05/16] fix --- flox/core.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/flox/core.py b/flox/core.py index 586fc308e..7533f0be0 100644 --- a/flox/core.py +++ b/flox/core.py @@ -66,9 +66,7 @@ def _get_expected_groups(by, sort, raise_if_dask=True) -> pd.Index | None: return None flatby = by.ravel() expected = pd.unique(flatby[~isnull(flatby)]) - if sort: - expected = np.sort(expected) - return _convert_expected_groups_to_index((expected,), isbin=(False,))[0] + return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0] def _get_chunk_reduction(reduction_type: str) -> Callable: @@ -1280,15 +1278,21 @@ def _assert_by_is_aligned(shape, by): ) -def _convert_expected_groups_to_index(expected_groups: tuple, isbin: bool) -> pd.Index | None: +def _convert_expected_groups_to_index( + expected_groups: tuple, isbin: bool, sort: bool +) -> pd.Index | None: out = [] for ex, isbin_ in zip(expected_groups, isbin): if isinstance(ex, pd.IntervalIndex) or (isinstance(ex, pd.Index) and not isbin): - out.append(expected_groups) + if sort: + ex = ex.sort_values() + out.append(ex) elif ex is not None: if isbin_: out.append(pd.IntervalIndex.from_arrays(ex[:-1], ex[1:])) else: + if sort: + ex = np.sort(ex) out.append(pd.Index(ex)) else: assert ex is None @@ -1465,9 +1469,7 @@ def groupby_reduce( # We convert to pd.Index since that lets us know if we are binning or not # (pd.IntervalIndex or not) - expected_groups = _convert_expected_groups_to_index(expected_groups, isbin) - if expected_groups is not None and sort: - expected_groups = expected_groups.sort_values() + expected_groups = _convert_expected_groups_to_index(expected_groups, isbin, sort) # when grouping by multiple variables, we factorize early. # TODO: could restrict this to dask-only From a541926d6f577d0a69672a0985250844578130b9 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 11:24:33 -0700 Subject: [PATCH 06/16] fix more --- flox/core.py | 5 +++-- tests/test_core.py | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index 7533f0be0..dcefc2a23 100644 --- a/flox/core.py +++ b/flox/core.py @@ -443,7 +443,7 @@ def factorize_( grp_shape = tuple(len(grp) for grp in found_groups) ngroups = np.prod(grp_shape) if len(by) > 1: - group_idx = np.ravel_multi_index(factorized, grp_shape) + group_idx = np.ravel_multi_index(factorized, grp_shape).reshape(by[0].shape) else: group_idx = factorized[0] @@ -455,7 +455,7 @@ def factorize_( # this is OK because for 3D by and axis=(1,2), # we collapse to a 2D by and axis=-1 offset_group = True - group_idx, size = offset_labels(group_idx.reshape(by[0].shape), ngroups) + group_idx, size = offset_labels(group_idx, ngroups) group_idx = group_idx.ravel() else: size = ngroups @@ -1512,6 +1512,7 @@ def groupby_reduce( "Please provide ``expected_groups`` when not reducing along all axes." ) + assert len(axis) <= by.ndim if len(axis) < by.ndim: by = _move_reduce_dims_to_end(by, -array.ndim + np.array(axis) + by.ndim) array = _move_reduce_dims_to_end(array, axis) diff --git a/tests/test_core.py b/tests/test_core.py index 7773883fa..8f8306588 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -780,7 +780,7 @@ def test_datetime_binning(): time_bins = pd.date_range(start="2010-08-01", end="2010-08-15", freq="24H") by = pd.date_range("2010-08-01", "2010-08-15", freq="15min") - (actual,) = _convert_expected_groups_to_index((time_bins,), isbin=(True,)) + (actual,) = _convert_expected_groups_to_index((time_bins,), isbin=(True,), sort=False) expected = pd.IntervalIndex.from_arrays(time_bins[:-1], time_bins[1:]) assert_equal(actual, expected) From 28a05d4c0853b7fd21c349a93b631c460c2cf2ed Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 12:24:50 -0700 Subject: [PATCH 07/16] fix even more --- flox/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index dcefc2a23..2c987fcd3 100644 --- a/flox/core.py +++ b/flox/core.py @@ -376,6 +376,7 @@ def offset_labels(labels: np.ndarray, ngroups: int) -> tuple[np.ndarray, int]: Copied from xhistogram & https://stackoverflow.com/questions/46256279/bin-elements-per-row-vectorized-2d-bincount-for-numpy """ + assert labels.ndim > 1 offset: np.ndarray = ( labels + np.arange(np.prod(labels.shape[:-1])).reshape((*labels.shape[:-1], -1)) * ngroups ) @@ -455,7 +456,7 @@ def factorize_( # this is OK because for 3D by and axis=(1,2), # we collapse to a 2D by and axis=-1 offset_group = True - group_idx, size = offset_labels(group_idx, ngroups) + group_idx, size = offset_labels(group_idx.reshape(by[0].shape), ngroups) group_idx = group_idx.ravel() else: size = ngroups From 3cbc7b610b1fe770ad33f1e1f1950ae0bdd1d7eb Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 13:26:26 -0700 Subject: [PATCH 08/16] polish --- flox/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/flox/core.py b/flox/core.py index 2c987fcd3..cc6a08207 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1498,7 +1498,6 @@ def groupby_reduce( # TODO: make sure expected_groups is unique if len(axis) == 1 and by.ndim > 1 and expected_groups is None: - # TODO: hack if not by_is_dask: expected_groups = _get_expected_groups(by, sort) else: From 1474297b6272c20e2cf044e8ef2ec840687b86f5 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 13:40:22 -0700 Subject: [PATCH 09/16] Further fixes --- flox/core.py | 3 ++- flox/xarray.py | 17 +++++++---------- tests/test_xarray.py | 30 ++++++++++++++++++++---------- 3 files changed, 29 insertions(+), 21 deletions(-) diff --git a/flox/core.py b/flox/core.py index cc6a08207..c5b1b4db9 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1302,7 +1302,7 @@ def _convert_expected_groups_to_index( def _lazy_factorize_wrapper(*by, **kwargs): - group_idx, _ = factorize_(by, **kwargs) + group_idx, *rest = factorize_(by, **kwargs) return group_idx @@ -1322,6 +1322,7 @@ def _factorize_multiple(by, expected_groups, by_is_dask): **kwargs, ) found_groups = tuple(None if is_duck_dask_array(b) else np.unique(b) for b in by) + grp_shape = tuple(len(e) for e in expected_groups) else: group_idx, found_groups, grp_shape = factorize_(by, **kwargs) diff --git a/flox/xarray.py b/flox/xarray.py index 341365d14..80a7105c9 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -201,10 +201,6 @@ def xarray_reduce( by: tuple[DataArray] = tuple(obj[g] if isinstance(g, str) else g for g in by) # type: ignore - # TODO: delete - if len(by) > 1 and any(is_duck_dask_array(by_.data) for by_ in by): - raise NotImplementedError("Grouping by multiple variables will compute dask variables.") - grouper_dims = tuple(itertools.chain(*tuple(g.dims for g in by))) if isinstance(obj, xr.DataArray): @@ -268,12 +264,13 @@ def xarray_reduce( "Please provide expected_groups if not grouping by a numpy-backed DataArray" ) if not isbin_: - uniques = np.unique(b.data) - nans = isnull(uniques) - if nans.any(): - uniques = uniques[~nans] - expected_groups[idx] = uniques - group_shape[idx] = len(uniques) + if expect is None: + uniques = np.unique(b.data) + nans = isnull(uniques) + if nans.any(): + uniques = uniques[~nans] + expected_groups[idx] = uniques + group_shape[idx] = len(expected_groups[idx]) else: if isinstance(expect, int): raise NotImplementedError( diff --git a/tests/test_xarray.py b/tests/test_xarray.py index 5f314fe08..ad2832065 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -100,16 +100,26 @@ def test_xarray_reduce_multiple_groupers(engine): actual = xarray_reduce(da, "labels", "labels2", func="count", engine=engine) xr.testing.assert_identical(expected, actual) - if has_dask: - with raise_if_dask_computes(): - actual = xarray_reduce( - da.chunk({"x": 2, "z": 1}), da.labels, da.labels2, func="count", engine=engine - ) - xr.testing.assert_identical(expected, actual) - - with pytest.raises(NotImplementedError): - xarray_reduce(da.chunk({"x": 2, "z": 1}), "labels", "labels2", func="count") - # xr.testing.assert_identical(expected, actual) + if not has_dask: + return + + with raise_if_dask_computes(): + actual = xarray_reduce( + da.chunk({"x": 2, "z": 1}), da.labels, da.labels2, func="count", engine=engine + ) + xr.testing.assert_identical(expected, actual) + + with pytest.raises(NotImplementedError): + xarray_reduce(da.chunk({"x": 2, "z": 1}), "labels", "labels2", func="count") + + actual = xarray_reduce( + da.chunk({"x": 2, "z": 1}), + "labels", + "labels2", + func="count", + expected_groups=(expected.labels.data, expected.labels2.data), + ) + xr.testing.assert_identical(expected, actual) @requires_dask From 872f3e4ed7e5d82d9d1e50f1abf4630c857316d2 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 13:59:30 -0700 Subject: [PATCH 10/16] Consolidate code --- flox/core.py | 6 +++--- flox/xarray.py | 44 +++++++++++++++++++------------------------- tests/test_xarray.py | 4 ++-- 3 files changed, 24 insertions(+), 30 deletions(-) diff --git a/flox/core.py b/flox/core.py index c5b1b4db9..f8263e1ea 100644 --- a/flox/core.py +++ b/flox/core.py @@ -59,10 +59,10 @@ def _prepare_for_flox(group_idx, array): return group_idx, ordered_array -def _get_expected_groups(by, sort, raise_if_dask=True) -> pd.Index | None: +def _get_expected_groups(by, sort, *, raise_if_dask=True) -> pd.Index | None: if is_duck_dask_array(by): if raise_if_dask: - raise ValueError("Please provide `expected_groups`.") + raise ValueError("Please provide expected_groups if not grouping by a numpy array.") return None flatby = by.ravel() expected = pd.unique(flatby[~isnull(flatby)]) @@ -1321,7 +1321,7 @@ def _factorize_multiple(by, expected_groups, by_is_dask): meta=np.array((), dtype=np.int64), **kwargs, ) - found_groups = tuple(None if is_duck_dask_array(b) else np.unique(b) for b in by) + found_groups = tuple(None if is_duck_dask_array(b) else pd.unique(b) for b in by) grp_shape = tuple(len(e) for e in expected_groups) else: group_idx, found_groups, grp_shape = factorize_(by, **kwargs) diff --git a/flox/xarray.py b/flox/xarray.py index 80a7105c9..bf06328e9 100644 --- a/flox/xarray.py +++ b/flox/xarray.py @@ -9,11 +9,12 @@ from .aggregations import Aggregation, _atleast_1d from .core import ( + _convert_expected_groups_to_index, + _get_expected_groups, groupby_reduce, rechunk_for_blockwise as rechunk_array_for_blockwise, rechunk_for_cohorts as rechunk_array_for_cohorts, ) -from .xrutils import is_duck_dask_array, isnull if TYPE_CHECKING: from xarray import DataArray, Dataset, Resample @@ -258,30 +259,24 @@ def xarray_reduce( group_shape = [None] * len(by) expected_groups = list(expected_groups) + + # Set expected_groups and convert to index since we need coords, sizes + # for output xarray objects for idx, (b, expect, isbin_) in enumerate(zip(by, expected_groups, isbin)): - if expect is None and is_duck_dask_array(b.data): + if isbin_ and isinstance(expect, int): raise NotImplementedError( - "Please provide expected_groups if not grouping by a numpy-backed DataArray" + "flox does not support binning into an integer number of bins yet." ) - if not isbin_: - if expect is None: - uniques = np.unique(b.data) - nans = isnull(uniques) - if nans.any(): - uniques = uniques[~nans] - expected_groups[idx] = uniques - group_shape[idx] = len(expected_groups[idx]) - else: - if isinstance(expect, int): - raise NotImplementedError( - "flox does not support binning into an integer number of bins yet." + if expect is None: + if isbin_: + raise ValueError( + f"Please provided bin edges for group variable {idx} " + f"named {group_names[idx]} in expected_groups." ) - # factorized, bins = pd.cut(by[0], bins=expected_groups[0], retbins=True) - group_shape[idx] = expect - else: - # nbins - 1 elements since expect provides the bin edges - group_shape[idx] = len(expect) - 1 + expected_groups[idx] = _get_expected_groups(b.data, sort=sort, raise_if_dask=True) + expected_groups = _convert_expected_groups_to_index(expected_groups, isbin, sort=sort) + group_shape = tuple(len(e) for e in expected_groups) group_sizes = dict(zip(group_names, group_shape)) def wrapper(array, *by, func, skipna, **kwargs): @@ -347,11 +342,10 @@ def wrapper(array, *by, func, skipna, **kwargs): if all(d not in ds[var].dims for d in dim): actual[var] = ds[var] - for name, expect, isbin_ in zip(group_names, expected_groups, isbin): - # Can't remove this till I figure out how to return groups from wrapper - # without broadcasting - if isbin_: - expect = [pd.Interval(left, right) for left, right in zip(expect[:-1], expect[1:])] + for name, expect in zip(group_names, expected_groups): + # Can't remove this till xarray handles IntervalIndex + if isinstance(expect, pd.IntervalIndex): + expect = expect.to_numpy() if isinstance(actual, xr.Dataset) and name in actual: actual = actual.drop_vars(name) actual[name] = expect diff --git a/tests/test_xarray.py b/tests/test_xarray.py index ad2832065..d32ab40ff 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -109,7 +109,7 @@ def test_xarray_reduce_multiple_groupers(engine): ) xr.testing.assert_identical(expected, actual) - with pytest.raises(NotImplementedError): + with pytest.raises(ValueError): xarray_reduce(da.chunk({"x": 2, "z": 1}), "labels", "labels2", func="count") actual = xarray_reduce( @@ -176,7 +176,7 @@ def test_xarray_reduce_errors(): xarray_reduce(da, by, func="mean", dim="foo") if has_dask: - with pytest.raises(NotImplementedError, match="provide expected_groups"): + with pytest.raises(ValueError, match="provide expected_groups"): xarray_reduce(da, by.chunk(), func="mean") From 131f87d5b51e810860a753820f88c75b46d29362 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 17:34:21 -0700 Subject: [PATCH 11/16] consolidate tests --- tests/test_xarray.py | 47 ++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/tests/test_xarray.py b/tests/test_xarray.py index d32ab40ff..3f6550aa5 100644 --- a/tests/test_xarray.py +++ b/tests/test_xarray.py @@ -74,9 +74,16 @@ def test_xarray_reduce(skipna, add_nan, min_count, engine, reindex): # TODO: sort -def test_xarray_reduce_multiple_groupers(engine): - arr = np.ones((4, 12)) +@pytest.mark.parametrize("pass_expected_groups", [True, False]) +@pytest.mark.parametrize("chunk", (True, False)) +def test_xarray_reduce_multiple_groupers(pass_expected_groups, chunk, engine): + if not has_dask and chunk: + pytest.skip() + if chunk and pass_expected_groups is False: + pytest.skip() + + arr = np.ones((4, 12)) labels = np.array(["a", "a", "c", "c", "c", "b", "b", "c", "c", "b", "b", "f"]) labels = np.array(labels) labels2 = np.array([1, 2, 2, 1]) @@ -85,41 +92,39 @@ def test_xarray_reduce_multiple_groupers(engine): arr, dims=("x", "y"), coords={"labels2": ("x", labels2), "labels": ("y", labels)} ).expand_dims(z=4) + if chunk: + da = da.chunk({"x": 2, "z": 1}) + expected = xr.DataArray( [[4, 4], [8, 8], [10, 10], [2, 2]], dims=("labels", "labels2"), coords={"labels": ["a", "b", "c", "f"], "labels2": [1, 2]}, ).expand_dims(z=4) - actual = xarray_reduce(da, da.labels, da.labels2, func="count", engine=engine) - xr.testing.assert_identical(expected, actual) + kwargs = dict(func="count", engine=engine) + if pass_expected_groups: + kwargs["expected_groups"] = (expected.labels.data, expected.labels2.data) - actual = xarray_reduce(da, "labels", da.labels2, func="count", engine=engine) + with raise_if_dask_computes(): + actual = xarray_reduce(da, da.labels, da.labels2, **kwargs) xr.testing.assert_identical(expected, actual) - actual = xarray_reduce(da, "labels", "labels2", func="count", engine=engine) + with raise_if_dask_computes(): + actual = xarray_reduce(da, "labels", da.labels2, **kwargs) xr.testing.assert_identical(expected, actual) - if not has_dask: - return - with raise_if_dask_computes(): - actual = xarray_reduce( - da.chunk({"x": 2, "z": 1}), da.labels, da.labels2, func="count", engine=engine - ) + actual = xarray_reduce(da, "labels", "labels2", **kwargs) xr.testing.assert_identical(expected, actual) - with pytest.raises(ValueError): - xarray_reduce(da.chunk({"x": 2, "z": 1}), "labels", "labels2", func="count") - actual = xarray_reduce( - da.chunk({"x": 2, "z": 1}), - "labels", - "labels2", - func="count", - expected_groups=(expected.labels.data, expected.labels2.data), +@requires_dask +def test_dask_groupers_error(): + da = xr.DataArray( + [1.0, 2.0], dims="x", coords={"labels": ("x", [1, 2]), "labels2": ("x", [1, 2])} ) - xr.testing.assert_identical(expected, actual) + with pytest.raises(ValueError): + xarray_reduce(da.chunk({"x": 2, "z": 1}), "labels", "labels2", func="count") @requires_dask From 543fdb0c98c69da9de26f4e44cd84933a328ac48 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 22 Feb 2022 19:57:52 -0700 Subject: [PATCH 12/16] More tests --- flox/core.py | 4 +- tests/test_core.py | 92 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 72 insertions(+), 24 deletions(-) diff --git a/flox/core.py b/flox/core.py index f8263e1ea..7c8414280 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1327,7 +1327,7 @@ def _factorize_multiple(by, expected_groups, by_is_dask): group_idx, found_groups, grp_shape = factorize_(by, **kwargs) final_groups = tuple( - pd.Index(found) if expect is None else expect + found if expect is None else expect.to_numpy() for found, expect in zip(found_groups, expected_groups) ) @@ -1480,8 +1480,6 @@ def groupby_reduce( by, expected_groups, by_is_dask=by_is_dask ) expected_groups = (pd.RangeIndex(np.prod(grp_shape)),) - else: - final_groups = expected_groups assert len(by) == 1 by = by[0] diff --git a/tests/test_core.py b/tests/test_core.py index 8f8306588..d500f4cf8 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -127,10 +127,7 @@ def test_groupby_reduce( assert_equal(expected, result) -@pytest.mark.parametrize("size", ((12,), (12, 5))) -@pytest.mark.parametrize("func", ALL_FUNCS) -def test_groupby_reduce_all(size, func, engine): - +def gen_array_by(size, func): by = np.ones(size[-1]) rng = np.random.default_rng(12345) array = rng.random(size) @@ -138,9 +135,15 @@ def test_groupby_reduce_all(size, func, engine): array[[1, 4, 5], ...] = np.nan elif "nanarg" in func and len(size) > 1: array[[1, 4, 5], 1] = np.nan - if func in ["any", "all"]: array = array > 0.5 + return array, by + + +@pytest.mark.parametrize("size", ((12,), (12, 5))) +@pytest.mark.parametrize("func", ALL_FUNCS) +def test_groupby_reduce_all(size, func, engine): + array, by = gen_array_by(size, func) finalize_kwargs = tuple({}) if "var" in func or "std" in func: @@ -148,28 +151,75 @@ def test_groupby_reduce_all(size, func, engine): for kwargs in finalize_kwargs: with np.errstate(invalid="ignore", divide="ignore"): - expected = getattr(np, func)(array, axis=-1, **kwargs) - expected = np.expand_dims(expected, -1) + expected = getattr(np, func)(array, axis=-1, keepdims=True, **kwargs) actual, _ = groupby_reduce(array, by, func=func, engine=engine, finalize_kwargs=kwargs) if "arg" in func: assert actual.dtype.kind == "i" assert_equal(actual, expected) - if not has_dask: - continue - for method in ["map-reduce", "cohorts", "split-reduce"]: - actual, _ = groupby_reduce( - da.from_array(array, chunks=3), - by, - func=func, - method=method, - engine=engine, - finalize_kwargs=kwargs, - ) - if "arg" in func: - assert actual.dtype.kind == "i" - assert_equal(actual, expected) + +@requires_dask +@pytest.mark.parametrize("size", ((12,), (12, 5))) +@pytest.mark.parametrize("func", ALL_FUNCS) +@pytest.mark.parametrize("method", ["map-reduce", "cohorts", "split-reduce"]) +def test_groupby_reduce_all_dask(size, method, func, engine): + array, by = gen_array_by(size, func) + + finalize_kwargs = tuple({}) + if "var" in func or "std" in func: + finalize_kwargs = finalize_kwargs + ({"ddof": 1}, {"ddof": 0}) + + for kwargs in finalize_kwargs: + with np.errstate(invalid="ignore", divide="ignore"): + expected = getattr(np, func)(array, axis=-1, keepdims=True, **kwargs) + + actual, _ = groupby_reduce( + da.from_array(array, chunks=3), + by, + func=func, + method=method, + engine=engine, + finalize_kwargs=kwargs, + ) + + if "arg" in func: + assert actual.dtype.kind == "i" + assert_equal(actual, expected) + + +@pytest.mark.parametrize("chunks", [None, 3, 4]) +@pytest.mark.parametrize("nby", [2, 3]) +@pytest.mark.parametrize("size", ((12,), (12, 5))) +@pytest.mark.parametrize("func", ALL_FUNCS) +def test_groupby_reduce_all_multiple_groupers(nby, size, chunks, func, engine): + if chunks is not None and not has_dask: + pytest.skip() + + array, by = gen_array_by(size, func) + if chunks: + array = dask.array.from_array(array, chunks=chunks) + by = (by,) * nby + by = tuple(b + idx for idx, b in enumerate(by)) + + finalize_kwargs = tuple({}) + if "var" in func or "std" in func: + finalize_kwargs = finalize_kwargs + ({"ddof": 1}, {"ddof": 0}) + + for kwargs in finalize_kwargs: + with np.errstate(invalid="ignore", divide="ignore"): + expected = getattr(np, func)(array, axis=-1, **kwargs) + for _ in range(nby): + expected = np.expand_dims(expected, -1) + actual, *groups = groupby_reduce( + array, *by, func=func, engine=engine, finalize_kwargs=kwargs + ) + expected_groups = tuple(np.array([idx + 1.0]) for idx in range(nby)) + for actual_group, expect in zip(groups, expected_groups): + assert_equal(actual_group, expect) + if "arg" in func: + assert actual.dtype.kind == "i" + assert_equal(actual, expected) @requires_dask From 4efdbfbd76248e56aa3e95646e855444d2ec0161 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sun, 13 Mar 2022 19:04:58 +0530 Subject: [PATCH 13/16] Fix test --- flox/core.py | 8 +++-- tests/test_core.py | 75 +++++++++++++--------------------------------- 2 files changed, 25 insertions(+), 58 deletions(-) diff --git a/flox/core.py b/flox/core.py index 7c8414280..246a8b46c 100644 --- a/flox/core.py +++ b/flox/core.py @@ -4,7 +4,7 @@ import itertools import operator from collections import namedtuple -from functools import partial +from functools import partial, reduce from typing import TYPE_CHECKING, Any, Callable, Dict, Mapping, Sequence, Union import numpy as np @@ -417,7 +417,7 @@ def factorize_( raise ValueError("Please pass bin edges in expected_groups.") # TODO: fix for binning found_groups.append(expect) - # pd.cut with bins = IntervalIndex[datetime64] doesn't work... + # pd.cut with bins = IntervalIndex[datetime64] doesn't work... if groupvar.dtype.kind == "M": expect = np.concatenate([expect.left.to_numpy(), [expect.right[-1].to_numpy()]]) idx = pd.cut(groupvar.ravel(), bins=expect).codes.copy() @@ -444,7 +444,9 @@ def factorize_( grp_shape = tuple(len(grp) for grp in found_groups) ngroups = np.prod(grp_shape) if len(by) > 1: - group_idx = np.ravel_multi_index(factorized, grp_shape).reshape(by[0].shape) + group_idx = np.ravel_multi_index(factorized, grp_shape, mode="wrap").reshape(by[0].shape) + nan_by_mask = reduce(np.logical_or, [np.isnan(b) for b in by]) + group_idx[nan_by_mask] = -1 else: group_idx = factorized[0] diff --git a/tests/test_core.py b/tests/test_core.py index d500f4cf8..36aeee654 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -1,3 +1,5 @@ +from functools import reduce + import numpy as np import pandas as pd import pytest @@ -140,80 +142,43 @@ def gen_array_by(size, func): return array, by -@pytest.mark.parametrize("size", ((12,), (12, 5))) -@pytest.mark.parametrize("func", ALL_FUNCS) -def test_groupby_reduce_all(size, func, engine): - array, by = gen_array_by(size, func) - - finalize_kwargs = tuple({}) - if "var" in func or "std" in func: - finalize_kwargs = finalize_kwargs + ({"ddof": 1}, {"ddof": 0}) - - for kwargs in finalize_kwargs: - with np.errstate(invalid="ignore", divide="ignore"): - expected = getattr(np, func)(array, axis=-1, keepdims=True, **kwargs) - - actual, _ = groupby_reduce(array, by, func=func, engine=engine, finalize_kwargs=kwargs) - if "arg" in func: - assert actual.dtype.kind == "i" - assert_equal(actual, expected) - - -@requires_dask -@pytest.mark.parametrize("size", ((12,), (12, 5))) -@pytest.mark.parametrize("func", ALL_FUNCS) -@pytest.mark.parametrize("method", ["map-reduce", "cohorts", "split-reduce"]) -def test_groupby_reduce_all_dask(size, method, func, engine): - array, by = gen_array_by(size, func) - - finalize_kwargs = tuple({}) - if "var" in func or "std" in func: - finalize_kwargs = finalize_kwargs + ({"ddof": 1}, {"ddof": 0}) - - for kwargs in finalize_kwargs: - with np.errstate(invalid="ignore", divide="ignore"): - expected = getattr(np, func)(array, axis=-1, keepdims=True, **kwargs) - - actual, _ = groupby_reduce( - da.from_array(array, chunks=3), - by, - func=func, - method=method, - engine=engine, - finalize_kwargs=kwargs, - ) - - if "arg" in func: - assert actual.dtype.kind == "i" - assert_equal(actual, expected) - - @pytest.mark.parametrize("chunks", [None, 3, 4]) -@pytest.mark.parametrize("nby", [2, 3]) +@pytest.mark.parametrize("nby", [1, 2, 3]) @pytest.mark.parametrize("size", ((12,), (12, 5))) +@pytest.mark.parametrize("add_nan_by", [True]) @pytest.mark.parametrize("func", ALL_FUNCS) -def test_groupby_reduce_all_multiple_groupers(nby, size, chunks, func, engine): +def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): if chunks is not None and not has_dask: pytest.skip() + if "arg" in func and engine == "flox": + pytest.skip() array, by = gen_array_by(size, func) if chunks: array = dask.array.from_array(array, chunks=chunks) by = (by,) * nby - by = tuple(b + idx for idx, b in enumerate(by)) - - finalize_kwargs = tuple({}) + by = [b + idx for idx, b in enumerate(by)] + if add_nan_by: + for idx in range(nby): + by[idx][2 * idx : 2 * idx + 2] = np.nan + by = tuple(by) + nanmask = reduce(np.logical_or, (np.isnan(b) for b in by)) + + finalize_kwargs = [{}] if "var" in func or "std" in func: - finalize_kwargs = finalize_kwargs + ({"ddof": 1}, {"ddof": 0}) + finalize_kwargs = finalize_kwargs + [{"ddof": 1}, {"ddof": 0}] for kwargs in finalize_kwargs: with np.errstate(invalid="ignore", divide="ignore"): - expected = getattr(np, func)(array, axis=-1, **kwargs) + expected = getattr(np, func)(array[..., ~nanmask], axis=-1, **kwargs) + for _ in range(nby): expected = np.expand_dims(expected, -1) actual, *groups = groupby_reduce( array, *by, func=func, engine=engine, finalize_kwargs=kwargs ) + assert actual.ndim == (array.ndim + nby - 1) + assert expected.ndim == (array.ndim + nby - 1) expected_groups = tuple(np.array([idx + 1.0]) for idx in range(nby)) for actual_group, expect in zip(groups, expected_groups): assert_equal(actual_group, expect) From af86cea65edde07b92631a0ddc536150a49ff632 Mon Sep 17 00:00:00 2001 From: dcherian Date: Sun, 13 Mar 2022 23:22:58 +0530 Subject: [PATCH 14/16] sq test --- flox/core.py | 2 +- tests/test_core.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index 246a8b46c..b49f50325 100644 --- a/flox/core.py +++ b/flox/core.py @@ -445,7 +445,7 @@ def factorize_( ngroups = np.prod(grp_shape) if len(by) > 1: group_idx = np.ravel_multi_index(factorized, grp_shape, mode="wrap").reshape(by[0].shape) - nan_by_mask = reduce(np.logical_or, [np.isnan(b) for b in by]) + nan_by_mask = reduce(np.logical_or, [isnull(b) for b in by]) group_idx[nan_by_mask] = -1 else: group_idx = factorized[0] diff --git a/tests/test_core.py b/tests/test_core.py index 36aeee654..3b0fb3a6e 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -144,7 +144,7 @@ def gen_array_by(size, func): @pytest.mark.parametrize("chunks", [None, 3, 4]) @pytest.mark.parametrize("nby", [1, 2, 3]) -@pytest.mark.parametrize("size", ((12,), (12, 5))) +@pytest.mark.parametrize("size", ((12,), (12, 8))) @pytest.mark.parametrize("add_nan_by", [True]) @pytest.mark.parametrize("func", ALL_FUNCS) def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): @@ -171,9 +171,9 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): for kwargs in finalize_kwargs: with np.errstate(invalid="ignore", divide="ignore"): expected = getattr(np, func)(array[..., ~nanmask], axis=-1, **kwargs) - for _ in range(nby): expected = np.expand_dims(expected, -1) + actual, *groups = groupby_reduce( array, *by, func=func, engine=engine, finalize_kwargs=kwargs ) From b40ba47989d17840677129f72470a346057a225f Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 15 Mar 2022 18:49:44 +0530 Subject: [PATCH 15/16] Fix tests --- flox/core.py | 5 ++++- tests/test_core.py | 6 ++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/flox/core.py b/flox/core.py index cd6113dd9..61ccb54ef 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1602,6 +1602,9 @@ def groupby_reduce( groups = (groups[0][sorted_idx],) if nby > 1: + # nan group labels are factorized to -1, and preserved + # now we get rid of them + nanmask = groups[0] == -1 groups = final_groups - result = result.reshape(result.shape[:-1] + grp_shape) + result = result[..., ~nanmask].reshape(result.shape[:-1] + grp_shape) return (result, *groups) diff --git a/tests/test_core.py b/tests/test_core.py index e3c067b3d..649b03f62 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -182,7 +182,7 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): for _ in range(nby): expected = np.expand_dims(expected, -1) - actual, *groups = groupby_reduce(array, by, **flox_kwargs) + actual, *groups = groupby_reduce(array, *by, **flox_kwargs) assert actual.ndim == (array.ndim + nby - 1) assert expected.ndim == (array.ndim + nby - 1) expected_groups = tuple(np.array([idx + 1.0]) for idx in range(nby)) @@ -197,7 +197,9 @@ def test_groupby_reduce_all(nby, size, chunks, func, add_nan_by, engine): for method in ["map-reduce", "cohorts", "split-reduce"]: if "arg" in func and method != "map-reduce": continue - actual, _ = groupby_reduce(array, by, method=method, **flox_kwargs) + actual, *groups = groupby_reduce(array, *by, method=method, **flox_kwargs) + for actual_group, expect in zip(groups, expected_groups): + assert_equal(actual_group, expect) if "arg" in func: assert actual.dtype.kind == "i" assert_equal(actual, expected) From 1c61a2223bf528bcb71260d555be38f839501186 Mon Sep 17 00:00:00 2001 From: dcherian Date: Tue, 15 Mar 2022 20:18:45 +0530 Subject: [PATCH 16/16] polish --- flox/core.py | 2 +- tests/__init__.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flox/core.py b/flox/core.py index 61ccb54ef..8b4a21ade 100644 --- a/flox/core.py +++ b/flox/core.py @@ -1317,7 +1317,7 @@ def _factorize_multiple(by, expected_groups, by_is_dask): ) if any(grp is None for grp in final_groups): - raise + raise ValueError("Please provide expected_groups when grouping by a dask array.") return (group_idx,), final_groups, grp_shape diff --git a/tests/__init__.py b/tests/__init__.py index 53f806972..fef0a778e 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -101,7 +101,7 @@ def assert_equal(a, b): np.testing.assert_allclose(a, b, equal_nan=True) -@pytest.fixture(scope="module", params=["flox"]) +@pytest.fixture(scope="module", params=["flox", "numpy", "numba"]) def engine(request): if request.param == "numba": try: